Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lease: remove code from 20.1->20.2 migration #62579

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (s *SQLServer) preStart(
mmKnobs = *migrationManagerTestingKnobs.(*sqlmigrations.MigrationManagerTestingKnobs)
}

s.leaseMgr.RefreshLeases(ctx, stopper, s.execCfg.DB, s.execCfg.Gossip)
s.leaseMgr.RefreshLeases(ctx, stopper, s.execCfg.DB)
s.leaseMgr.PeriodicallyRefreshSomeLeases(ctx)

// Only start the sqlliveness subsystem if we're already at the cluster
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
Expand All @@ -22,7 +20,6 @@ go_library(
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
Expand Down
214 changes: 6 additions & 208 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
Expand All @@ -35,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand Down Expand Up @@ -1140,10 +1137,6 @@ type ManagerTestingKnobs struct {
// To disable the deletion of orphaned leases at server startup.
DisableDeleteOrphanedLeases bool

// AlwaysUseRangefeeds ensures that rangefeeds and not gossip are used to
// detect changes to descriptors.
AlwaysUseRangefeeds bool

// VersionPollIntervalForRangefeeds controls the polling interval for the
// check whether the requisite version for rangefeed-based notifications has
// been finalized.
Expand Down Expand Up @@ -1679,21 +1672,9 @@ func (m *Manager) findDescriptorState(id descpb.ID, create bool) *descriptorStat
// leases for descriptors received in the latest system configuration via gossip or
// rangefeeds. This function must be passed a non-nil gossip if
// RangefeedLeases is not active.
func (m *Manager) RefreshLeases(
ctx context.Context, s *stop.Stopper, db *kv.DB, g gossip.OptionalGossip,
) {
// TODO(ajwerner): is this task needed? refreshLeases appears to already
// delegate everything to a goroutine.
_ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) {
m.refreshLeases(ctx, g, db, s)
})
}

func (m *Manager) refreshLeases(
ctx context.Context, g gossip.OptionalGossip, db *kv.DB, s *stop.Stopper,
) {
func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) {
descUpdateCh := make(chan *descpb.Descriptor)
m.watchForUpdates(ctx, s, db, g, descUpdateCh)
m.watchForUpdates(ctx, descUpdateCh)
_ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) {
for {
select {
Expand Down Expand Up @@ -1733,91 +1714,9 @@ func (m *Manager) refreshLeases(
})
}

// watchForUpdates will watch either gossip or rangefeeds for updates. If the
// version does not currently support rangefeeds, gossip will be used until
// rangefeeds are supported, at which time, the system will shut down the
// gossip listener and start using rangefeeds.
func (m *Manager) watchForUpdates(
ctx context.Context,
s *stop.Stopper,
db *kv.DB,
g gossip.OptionalGossip,
descUpdateCh chan *descpb.Descriptor,
) {
useRangefeeds := m.testingKnobs.AlwaysUseRangefeeds ||
m.storage.settings.Version.IsActive(ctx, clusterversion.RangefeedLeases)
if useRangefeeds {
m.watchForRangefeedUpdates(ctx, s, db, descUpdateCh)
return
}
gossipCtx, cancelWatchingGossip := context.WithCancel(ctx)
m.watchForGossipUpdates(gossipCtx, s, g, descUpdateCh)
canUseRangefeedsCh := m.waitForRangefeedsToBeUsable(ctx, s)
if err := s.RunAsyncTask(ctx, "wait for upgrade", func(ctx context.Context) {
select {
case <-s.ShouldQuiesce():
return
case <-canUseRangefeedsCh:
// Note: It's okay that the cancelation of gossip watching is
// asynchronous. At worst we'd get duplicate updates or stale updates.
// Both of those are handled.
cancelWatchingGossip()
// Note: It's safe to start watching for rangefeeds now. We know that all
// nodes support rangefeeds in the system config span. Even though there
// may not have been logical ops for all operations in the log, the
// catch-up scan should take us up to the present.
//
// When the rangefeed starts up we'll pass it an initial timestamp which
// is no newer than all updates to the system config span we've already
// seen (see setResolvedTimestamp and its callers). The rangefeed API
// ensures that we will see all updates from on or before that timestamp
// at least once.
m.watchForRangefeedUpdates(ctx, s, db, descUpdateCh)
}
}); err != nil {
// Note: this can only happen if the stopper has been stopped.
return
}
}

func (m *Manager) watchForGossipUpdates(
ctx context.Context,
s *stop.Stopper,
g gossip.OptionalGossip,
descUpdateCh chan<- *descpb.Descriptor,
) {
rawG, err := g.OptionalErr(47150)
if err != nil {
if v := clusterversion.RangefeedLeases; !m.storage.settings.Version.IsActive(ctx, v) {
log.Fatalf(ctx, "required gossip until %v is active: %v", clusterversion.RangefeedLeases, err)
}
return
}

_ = s.RunAsyncTask(ctx, "gossip-updates", func(ctx context.Context) {
descKeyPrefix := m.storage.codec.TablePrefix(uint32(systemschema.DescriptorTable.GetID()))
// TODO(ajwerner): Add a mechanism to unregister this channel upon
// return. NB: this call is allowed to bypass OptionalGossip because
// we'll never get here after RangefeedLeases.
gossipUpdateC := rawG.RegisterSystemConfigChannel()
filter := gossip.MakeSystemConfigDeltaFilter(descKeyPrefix)

ctx, cancel := s.WithCancelOnQuiesce(ctx)
defer cancel()
for {
select {
case <-gossipUpdateC:
m.handleUpdatedSystemCfg(ctx, rawG, &filter, descUpdateCh)
case <-s.ShouldQuiesce():
return
}
}
})
}

func (m *Manager) watchForRangefeedUpdates(
ctx context.Context, s *stop.Stopper, db *kv.DB, descUpdateCh chan<- *descpb.Descriptor,
) {
// watchForUpdates will watch a rangefeed on the system.descriptor table for
// updates.
func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *descpb.Descriptor) {
if log.V(1) {
log.Infof(ctx, "using rangefeeds for lease manager updates")
}
Expand Down Expand Up @@ -1856,111 +1755,10 @@ func (m *Manager) watchForRangefeedUpdates(
// Also note that the range feed automatically shuts down when the server
// shuts down, so we don't need to call Close() ourselves.
_, _ = m.rangeFeedFactory.RangeFeed(
ctx, "lease", descriptorTableSpan, m.getResolvedTimestamp(), handleEvent,
ctx, "lease", descriptorTableSpan, hlc.Timestamp{}, handleEvent,
)
}

func (m *Manager) handleUpdatedSystemCfg(
ctx context.Context,
rawG *gossip.Gossip,
cfgFilter *gossip.SystemConfigDeltaFilter,
descUpdateCh chan<- *descpb.Descriptor,
) {
cfg := rawG.GetSystemConfig()
// Read all descriptors and their versions
if log.V(2) {
log.Info(ctx, "received a new config; will refresh leases")
}
var latestTimestamp hlc.Timestamp
cfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) {
// Attempt to unmarshal config into a descriptor.
var descriptor descpb.Descriptor
if latestTimestamp.Less(kv.Value.Timestamp) {
latestTimestamp = kv.Value.Timestamp
}
if err := kv.Value.GetProto(&descriptor); err != nil {
log.Warningf(ctx, "%s: unable to unmarshal descriptor %v", kv.Key, kv.Value)
return
}
if descriptor.Union == nil {
return
}
descpb.MaybeSetDescriptorModificationTimeFromMVCCTimestamp(&descriptor, kv.Value.Timestamp)
id, version, name, _ := descpb.GetDescriptorMetadata(&descriptor)
if log.V(2) {
log.Infof(ctx, "%s: refreshing lease on descriptor: %d (%s), version: %d",
kv.Key, id, name, version)
}
select {
case <-ctx.Done():
case descUpdateCh <- &descriptor:
}
})
if !latestTimestamp.IsEmpty() {
m.setResolvedTimestamp(latestTimestamp)
}
// Attempt to shove a nil descriptor into the channel to ensure that
// we've processed all of the events previously sent.
select {
case <-ctx.Done():
// If we've been canceled, the other size of the channel will also have
// been canceled.
case descUpdateCh <- nil:
}
}

// waitForRangefeedsToBeUsable returns a channel which is closed when rangefeeds
// are usable according to the cluster version.
func (m *Manager) waitForRangefeedsToBeUsable(ctx context.Context, s *stop.Stopper) chan struct{} {
// TODO(ajwerner): Add a callback to notify about version changes.
// Checking is pretty cheap but really this should be a callback.
const defaultCheckInterval = 10 * time.Second
checkInterval := defaultCheckInterval
if m.testingKnobs.VersionPollIntervalForRangefeeds != 0 {
checkInterval = m.testingKnobs.VersionPollIntervalForRangefeeds
}
upgradeChan := make(chan struct{})
timer := timeutil.NewTimer()
timer.Reset(0)
// NB: we intentionally do *not* close upgradeChan if the task never starts.
_ = s.RunAsyncTask(ctx, "wait-rangefeed-version", func(ctx context.Context) {
for {
select {
case <-timer.C:
timer.Read = true
if m.storage.settings.Version.IsActive(ctx, clusterversion.RangefeedLeases) {
close(upgradeChan)
return
}
timer.Reset(checkInterval)
case <-ctx.Done():
return
case <-s.ShouldQuiesce():
return
}
}
})
return upgradeChan
}

// setResolvedTimestamp marks the Manager as having processed all updates
// up to this timestamp. It is set under the gossip path based on the highest
// timestamp seen in a system config and under the rangefeed path when a
// resolved timestamp is received.
func (m *Manager) setResolvedTimestamp(ts hlc.Timestamp) {
m.mu.Lock()
defer m.mu.Unlock()
if m.mu.updatesResolvedTimestamp.Less(ts) {
m.mu.updatesResolvedTimestamp = ts
}
}

func (m *Manager) getResolvedTimestamp() hlc.Timestamp {
m.mu.Lock()
defer m.mu.Unlock()
return m.mu.updatesResolvedTimestamp
}

// leaseRefreshLimit is the upper-limit on the number of descriptor leases
// that will continuously have their lease refreshed.
var leaseRefreshLimit = settings.RegisterIntSetting(
Expand Down
8 changes: 1 addition & 7 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,13 +2204,7 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) {
interestingTable.Store(descpb.ID(0))
blockLeaseAcquisitionOfInterestingTable := make(chan chan struct{})
unblockAll := make(chan struct{})
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLLeaseManager: &lease.ManagerTestingKnobs{
AlwaysUseRangefeeds: true,
},
},
}
args := base.TestServerArgs{}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: args,
})
Expand Down