Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
131951: changefeedccl: reduce use of errors during pauseOrResumePolling r=stevendanna a=wenyihu6

Previously, errors were constructed and compared in pauseOrResumePolling solely
to indicate that pausing cannot be stopped and period table history scan is
necessary during pauseOrResumePolling. However, no actual errors were returned
from pauseOrResumePolling. This error handling introduced unnecessary CPU
overhead in this hot path, as observed during escalations and DRT scale testing.
This patch improves performance by replacing error-based communication with a
boolean check.

Fixes: cockroachdb#131327
Release note (performance improvement): Enhanced performance when schema_locked
is not in use by improving error handling during periodic table history polling.

```
PauseOrResumePolling-8   16.796µ ± 1% sec/op    3.704µ ± 3% sec/op  -77.95% (p=0.000 n=10)
PauseOrResumePolling-8   7.098Ki ± 0% B/op    2.027Ki ± 0% B/op  -71.44% (p=0.000 n=10)
PauseOrResumePolling-8   145.00 ± 0% allocs/op    36.00 ± 0% allocs/op  -75.17% (p=0.000 n=10)
```

132177: server/license: Change how trial license usage is tracked r=rafiss a=spilchen

There’s a race condition when updating the enterprise.license config setting and checking the trial usage count. If a node starts up while a new trial license is being applied, it can encounter an issue where it sees the updated trial usage count in KV before receiving the corresponding enterprise.license config setting. This causes the license update to be rejected, as it incorrectly assumes a trial license has already been used.

This change addresses the issue by modifying what is stored in the KV for the trial license. Instead of tracking the number of trial licenses used, which would ever be 0 or 1, we now store the expiry timestamp of any active or past trial license. The enterprise.license validation function will compare the expiry of the new license against the cached value from KV. If the expiry timestamp is not set or matches the expiry of the new license, the validation will proceed. Otherwise, it will fail as before.

This change will be backported to 24.2, 24.1, 23.2 and 23.1.

Epic: CRDB-39988
Closes cockroachdb#131968
Release note: none

132189: workflows: delete add issue to project workflow file r=rail a=rickystewart

This use case is subsumed by project workflows ([ref](https://docs.github.com/en/issues/planning-and-tracking-with-projects/automating-your-project/using-the-built-in-automations)).

Part of: DEVINF-1313
Epic: DEVINF-1006
Release note: None

Co-authored-by: Wenyi Hu <[email protected]>
Co-authored-by: Matt Spilchen <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
4 people committed Oct 8, 2024
4 parents cf9f8d4 + f701074 + 71f7b2a + 783972c commit 9666a73
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 119 deletions.
40 changes: 0 additions & 40 deletions .github/workflows/add-issues-to-project.yml

This file was deleted.

13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ func (ts *Targets) EachTableID(f func(descpb.ID) error) error {
return nil
}

// EachTableIDWithBool is similar to EachTableID but avoids using
// iterutil.Map(err) to eliminate the overhead of errors.Is. Thus, f should not
// return iterutil.StopIteration(). It returns false with error when the
// callback f returns false or true when the iteration completes.
func (ts *Targets) EachTableIDWithBool(f func(descpb.ID) (bool, error)) (bool, error) {
for id := range ts.m {
if b, err := f(id); !b {
return false, err
}
}
return true, nil
}

// EachHavingTableID iterates over each Target with the given id, returning
// false if there were none.
func (ts *Targets) EachHavingTableID(id descpb.ID, f func(Target) error) (bool, error) {
Expand Down
26 changes: 16 additions & 10 deletions pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,36 +480,42 @@ func (tf *schemaFeed) pauseOrResumePolling(ctx context.Context, atOrBefore hlc.T
// Always assume we need to resume polling until we've proven otherwise.
tf.mu.pollingPaused = false

if err := tf.targets.EachTableID(func(id descpb.ID) error {
if canPausePolling, err := tf.targets.EachTableIDWithBool(func(id descpb.ID) (bool, error) {
// Check if target table is schema-locked at the current frontier.
ld1, err := tf.leaseMgr.Acquire(ctx, frontier, id)
if err != nil {
return err
return false, err
}
defer ld1.Release(ctx)
desc1 := ld1.Underlying().(catalog.TableDescriptor)
if !desc1.IsSchemaLocked() {
return errors.Newf("desc %d not schema-locked at frontier %s", desc1.GetID(), frontier)
if log.V(2) {
log.Infof(ctx, "desc %d not schema-locked at frontier %s", desc1.GetID(), frontier)
}
return false, nil
}

if atOrBefore.LessEq(frontier) {
return nil
return true, nil
}

// Check if target table remains at the same version at atOrBefore.
ld2, err := tf.leaseMgr.Acquire(ctx, atOrBefore, id)
if err != nil {
return err
return false, err
}
defer ld2.Release(ctx)
desc2 := ld2.Underlying().(catalog.TableDescriptor)
if desc1.GetVersion() != desc2.GetVersion() {
return errors.Newf("desc %d version changed from version %d to %d between frontier %s and atOrBefore %s",
desc1.GetID(), desc1.GetVersion(), desc2.GetVersion(), frontier, atOrBefore)
if log.V(1) {
log.Infof(ctx,
"desc %d version changed from version %d to %d between frontier %s and atOrBefore %s",
desc1.GetID(), desc1.GetVersion(), desc2.GetVersion(), frontier, atOrBefore)
}
return false, nil
}

return nil
}); err != nil {
return true, nil
}); !canPausePolling || err != nil {
if errors.Is(err, catalog.ErrDescriptorDropped) {
// If a table is dropped and causes Acquire to fail, we mark it as a
// terminal error, so that we don't retry, and let the changefeed job
Expand Down
63 changes: 63 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,66 @@ func TestPauseOrResumePolling(t *testing.T) {
require.False(t, sf.pollingPaused())
require.Equal(t, hlc.Timestamp{WallTime: 50}, getFrontier())
}

// BenchmarkPauseOrResumePolling benchmarks pauseOrResumePolling in cases where
// there is a non-terminal error early, polling should be paused, and polling
// should not be paused.
func BenchmarkPauseOrResumePolling(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

ctx := context.Background()

const tableID = 123
sf := schemaFeed{
leaseMgr: &testLeaseAcquirer{
id: tableID,
descs: []*testLeasedDescriptor{
newTestLeasedDescriptor(tableID, 1, false, hlc.Timestamp{WallTime: 30}),
newTestLeasedDescriptor(tableID, 2, true, hlc.Timestamp{WallTime: 40}),
},
},
targets: CreateChangefeedTargets(tableID),
}
setFrontier := func(ts hlc.Timestamp) error {
sf.mu.Lock()
defer sf.mu.Unlock()
return sf.mu.ts.advanceFrontier(ts)
}

// Set the initial frontier to 10.
require.NoError(b, setFrontier(hlc.Timestamp{WallTime: 10}))
// Initially, polling should not be paused.
require.False(b, sf.pollingPaused())

b.Run("non-terminal error", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
// We expect a non-terminal error to be swallowed for time 10 since a
// valid descriptor does not exist for time 10.
require.NoError(b, sf.pauseOrResumePolling(ctx, hlc.Timestamp{WallTime: 10}))
}
require.False(b, sf.pollingPaused())
})
b.Run("not schema locked", func(b *testing.B) {
// We bump the highwater up to reflect a descriptor being read at time 30.
require.NoError(b, setFrontier(hlc.Timestamp{WallTime: 30}))
b.ResetTimer()
for i := 0; i < b.N; i++ {
// We do not expect polling to be paused for time 30 since the descriptor
// at time 30 is not schema locked.
require.NoError(b, sf.pauseOrResumePolling(ctx, hlc.Timestamp{WallTime: 30}))
}
require.False(b, sf.pollingPaused())
})
b.Run("schema locked", func(b *testing.B) {
// We bump the highwater up to reflect a descriptor being read at time 50.
require.NoError(b, setFrontier(hlc.Timestamp{WallTime: 50}))
for i := 0; i < b.N; i++ {
// We expect polling to be paused for time 50 now that the highwater on a
// schema-locked version.
require.NoError(b, sf.pauseOrResumePolling(ctx, hlc.Timestamp{WallTime: 50}))
}
require.True(b, sf.pollingPaused())
})
}
15 changes: 8 additions & 7 deletions pkg/ccl/utilccl/license_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/cockroachdb/redact"
)

// trialLicenseUsageCount keeps track of the number of times a free trial
// license has already been installed on this cluster.
var trialLicenseUsageCount atomic.Int64
// trialLicenseExpiryTimestamp tracks the expiration timestamp of any trial licenses
// that have been installed on this cluster (past or present).
var trialLicenseExpiryTimestamp atomic.Int64

var enterpriseLicense = settings.RegisterStringSetting(
settings.SystemVisible,
Expand All @@ -45,7 +45,8 @@ var enterpriseLicense = settings.RegisterStringSetting(
return nil
}

if l.Type == licenseccl.License_Trial && trialLicenseUsageCount.Load() > 0 {
if l.Type == licenseccl.License_Trial && trialLicenseExpiryTimestamp.Load() > 0 &&
l.ValidUntilUnixSec != trialLicenseExpiryTimestamp.Load() {
return errors.WithHint(errors.Newf("a trial license has previously been installed on this cluster"),
"Please install a non-trial license to continue")
}
Expand Down Expand Up @@ -378,12 +379,12 @@ func RegisterCallbackOnLicenseChange(
}
licenseEnforcer.RefreshForLicenseChange(ctx, licenseType, licenseExpiry)

cnt, err := licenseEnforcer.CalculateTrialUsageCount(ctx, licenseType, isChange)
expiry, err := licenseEnforcer.UpdateTrialLicenseExpiry(ctx, licenseType, isChange, licenseExpiry.Unix())
if err != nil {
log.Errorf(ctx, "unable to calculate trial license usage count: %v", err)
log.Errorf(ctx, "unable to update trial license expiry: %v", err)
return
}
trialLicenseUsageCount.Store(cnt)
trialLicenseExpiryTimestamp.Store(expiry)
}
// Install the hook so that we refresh license details when the license changes.
enterpriseLicense.SetOnChange(&st.SV,
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/utilccl/license_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,17 @@ func TestRefreshLicenseEnforcerOnLicenseChange(t *testing.T) {
{[]string{"crl-0-EMDYt8MDGAEiDkNSREIgVW5pdCBUZXN0KAM"}, "", timeutil.UnixEpoch},
// No license - 7 days grace period
{[]string{""}, "", ts1.Add(30 * 24 * time.Hour)},
// Only 1 trial license allowed
// Two trial license allowed if they both have the same expiry
{[]string{"crl-0-EMDYt8MDGAQiDkNSREIgVW5pdCBUZXN0", "crl-0-EMDYt8MDGAQiDkNSREIgVW5pdCBUZXN0"},
"", jan1st2000.Add(7 * 24 * time.Hour)},
// A second trial license is not allowed if it has a different expiry (Jan 1st 2000 8:01 AST)
{[]string{"crl-0-EMDYt8MDGAQiDkNSREIgVW5pdCBUZXN0", "crl-0-EPzYt8MDGAQiDkNSREIgVW5pdCBUZXN0"},
"a trial license has previously been installed on this cluster", timeutil.UnixEpoch},
} {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
// Reset from prior test unit.
cnt, err := enforcer.SetTrialUsageCount(ctx, 0, false /* checkOldCount */)
err := enforcer.TestingResetTrialUsage(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), cnt)

tdb := sqlutils.MakeSQLRunner(sqlDB)

Expand All @@ -452,17 +454,17 @@ func TestRefreshLicenseEnforcerOnLicenseChange(t *testing.T) {
tdb.Exec(t, sql)

// If installing a trial license, we need to wait for the callback to
// bump the count before continuing. We depend on the count to cause an
// bump the expiry before continuing. We depend on the expiry to cause an
// error if another trial license is installed.
l, err := decode(tc.licenses[i])
require.NoError(t, err)
if l.Type == licenseccl.License_Trial {
var cnt int64
var expiry int64
require.Eventually(t, func() bool {
cnt = trialLicenseUsageCount.Load()
return cnt > 0
expiry = trialLicenseExpiryTimestamp.Load()
return expiry > 0
}, 20*time.Second, time.Millisecond,
"trialLicenseUsageCount last returned %t", cnt)
"trialLicenseExpiryTimestamp last returned %t", expiry)
}
}

Expand All @@ -484,7 +486,7 @@ func TestRefreshLicenseEnforcerOnLicenseChange(t *testing.T) {
hasLicense = enforcer.GetHasLicense()
return (lastLicense != "") == hasLicense
}, 20*time.Second, time.Millisecond,
"GetHasLicense() last returned %t", hasLicense)
"GetHasLicense() did not return hasLicense of %t in time", lastLicense != "")
var ts time.Time
var hasGracePeriod bool
require.Eventually(t, func() bool {
Expand All @@ -494,7 +496,7 @@ func TestRefreshLicenseEnforcerOnLicenseChange(t *testing.T) {
}
return ts.Equal(tc.expectedGracePeriodEnd)
}, 20*time.Second, time.Millisecond,
"GetGracePeriodEndTS() last returned %v (%t)", ts, hasGracePeriod)
"GetGracePeriodEndTS() did not return grace period of %s in time", tc.expectedGracePeriodEnd.String())
})
}
}
5 changes: 2 additions & 3 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,8 @@ var (
// set during cluster initialization, by which a license must be installed to avoid
// throttling. The value is stored as the number of seconds since the Unix epoch.
ClusterInitGracePeriodTimestamp = roachpb.Key(makeKey(SystemPrefix, roachpb.RKey("lic-gpi-ts")))
// TrialLicenseUsageCount is used to keep track of the number of times a trial
// license was installed on the cluster.
TrialLicenseUsageCount = roachpb.Key(makeKey(SystemPrefix, roachpb.RKey("lic-tluc")))
// TrialLicenseExpiry is used to track the expiry of any trial license (past or present)
TrialLicenseExpiry = roachpb.Key(makeKey(SystemPrefix, roachpb.RKey("lic-tle")))
//
// NodeIDGenerator is the global node ID generator sequence.
NodeIDGenerator = roachpb.Key(makeKey(SystemPrefix, roachpb.RKey("node-idgen")))
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ var _ = [...]interface{}{
NodeLivenessPrefix, // "\x00liveness-"
BootstrapVersionKey, // "bootstrap-version"
ClusterInitGracePeriodTimestamp, // "lic-gpi-ts"
TrialLicenseUsageCount, // "lic-tluc"
TrialLicenseExpiry, // "lic-tle"
NodeIDGenerator, // "node-idgen"
RangeIDGenerator, // "range-idgen"
StatusPrefix, // "status-"
Expand Down
Loading

0 comments on commit 9666a73

Please sign in to comment.