Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130263: server/license: Integrate license throttle with SQL r=fqazi a=spilchen

Prior to this, the throttling code had been implemented but wasn’t integrated with SQL. This change completes that integration, applying throttling only to external SQL queries—internal SQL queries are exempt. If throttling is active, an error is returned and propagated to the caller.

For licenses requiring telemetry, the latest telemetry ping time is used in the throttling calculation by implementing the
license.TelemetryStatusReporter interface.

Most throttling functionality remains disabled, enabled only in unit tests or via environment variables. Full throttling will be enabled in CRDB-41758.

Epic: CRDB-39988
Closes: CRDB-39991
Release note: None

130526: roachtest: use default max_index_buffer_size in backup/mvcc-range-tombstones r=kev-cao a=msbutler

To avoid OOMs seen in cockroachdb#130233.

Fixes cockroachdb#130233

Release note: none

Co-authored-by: Matt Spilchen <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Sep 11, 2024
3 parents d7614a6 + 8ef9d81 + 1669f1e commit 67e99eb
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 54 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/utilccl/license_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ func check(l *licenseccl.License, at time.Time, org, feature string, withDetails

// RegisterCallbackOnLicenseChange will register a callback to update the
// license enforcer whenever the license changes.
func RegisterCallbackOnLicenseChange(ctx context.Context, st *cluster.Settings) {
func RegisterCallbackOnLicenseChange(
ctx context.Context, st *cluster.Settings, licenseEnforcer *licenseserver.Enforcer,
) {
refreshFunc := func(ctx context.Context) {
lic, err := getLicense(st)
if err != nil {
Expand All @@ -336,7 +338,7 @@ func RegisterCallbackOnLicenseChange(ctx context.Context, st *cluster.Settings)
licenseType = licenseserver.LicTypeEnterprise
}
}
licenseserver.GetEnforcerInstance().RefreshForLicenseChange(licenseType, licenseExpiry)
licenseEnforcer.RefreshForLicenseChange(ctx, licenseType, licenseExpiry)
}
// Install the hook so that we refresh license details when the license changes.
enterpriseLicense.SetOnChange(&st.SV, refreshFunc)
Expand Down
4 changes: 1 addition & 3 deletions pkg/cmd/roachtest/tests/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,7 @@ func runBackupMVCCRangeTombstones(

// Configure cluster.
t.Status("configuring cluster")
_, err := conn.Exec(`SET CLUSTER SETTING kv.bulk_ingest.max_index_buffer_size = '2gb'`)
require.NoError(t, err)
_, err = conn.Exec(`SET CLUSTER SETTING server.debug.default_vmodule = 'txn=2,sst_batcher=4,revert=2'`)
_, err := conn.Exec(`SET CLUSTER SETTING server.debug.default_vmodule = 'txn=2,sst_batcher=4,revert=2'`)
require.NoError(t, err)
// Wait for ranges to upreplicate.
require.NoError(t, WaitFor3XReplication(ctx, t, t.L(), conn))
Expand Down
7 changes: 7 additions & 0 deletions pkg/server/diagnostics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ func (r *Reporter) ReportDiagnostics(ctx context.Context) {
r.SQLServer.GetReportedSQLStatsController().ResetLocalSQLStats(ctx)
}

// GetLastSuccessfulTelemetryPing will return the timestamp of when we last got
// a ping back from the registration server.
func (r *Reporter) GetLastSuccessfulTelemetryPing() time.Time {
ts := timeutil.Unix(r.LastSuccessfulTelemetryPing.Load(), 0)
return ts
}

// CreateReport generates a new diagnostics report containing information about
// the current node or tenant.
func (r *Reporter) CreateReport(
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/license/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ go_test(
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/isql",
"//pkg/sql/sessiondata",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/license/cclbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// RegisterCallbackOnLicenseChange is a pointer to a function that will register
// a callback when the license changes. This is initially empty here. When
// initializing the ccl package, this variable will be set to a valid function.
var RegisterCallbackOnLicenseChange = func(ctx context.Context, st *cluster.Settings) {}
var RegisterCallbackOnLicenseChange = func(context.Context, *cluster.Settings, *Enforcer) {}

// LicType is the type to define the license type, as needed by the license
// enforcer.
Expand Down
110 changes: 82 additions & 28 deletions pkg/server/license/enforcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Enforcer struct {
// telemetryStatusReporter is an interface for getting the timestamp of the
// last successful ping to the telemetry server. For some licenses, sending
// telemetry data is required to avoid throttling.
telemetryStatusReporter TelemetryStatusReporter
telemetryStatusReporter atomic.Pointer[TelemetryStatusReporter]

// clusterInitGracePeriodEndTS marks the end of the grace period when a
// license is required. It is set during the cluster's initial startup. The
Expand Down Expand Up @@ -86,13 +86,18 @@ type Enforcer struct {
}

type TestingKnobs struct {
// EnableGracePeriodInitTSWrite is a control knob for writing the grace period
// initialization timestamp. It is currently set to opt-in for writing the
// timestamp as a way to stage these changes. This ensures that the timestamp
// isn't written before the other license enforcement changes are complete.
// TODO(spilchen): Change this knob to opt-out as we approach the final stages
// of the core licensing deprecation work. This will be handled in CRDB-41758.
EnableGracePeriodInitTSWrite bool
// Enable controls whether the enforcer writes the grace period end time to KV
// and performs throttle checks. This is currently opt-in to allow for a gradual
// rollout of these changes. It will be removed or changed to opt-out as we near
// the final stages of the CockroachDB core licensing deprecation.
// TODO(spilchen): Update or remove this knob closer to the completion of the
// core licensing deprecation work (CRDB-41758).
Enable bool

// SkipDisable makes the Disable() function a no-op. This is separate from Enable
// because we perform additional checks during server startup that may automatically
// disable enforcement based on configuration (e.g., for single-node instances).
SkipDisable bool

// OverrideStartTime if set, overrides the time that's used to seed the
// grace period init timestamp.
Expand All @@ -101,6 +106,10 @@ type TestingKnobs struct {
// OverrideThrottleCheckTime if set, overrides the timestamp used when
// checking if throttling is active.
OverrideThrottleCheckTime *time.Time

// OverrideMaxOpenTransactions if set, overrides the maximum open transactions
// when checking if active throttling.
OverrideMaxOpenTransactions *int64
}

// TelemetryStatusReporter is the interface we use to find the last ping
Expand All @@ -126,14 +135,16 @@ func GetEnforcerInstance() *Enforcer {

// newEnforcer creates a new Enforcer object.
func newEnforcer() *Enforcer {
return &Enforcer{
e := &Enforcer{
startTime: timeutil.Now(),
}
e.isDisabled.Store(true) // Start disabled until Start() is called
return e
}

// SetTelemetryStatusReporter will set the pointer to the telemetry status reporter.
func (e *Enforcer) SetTelemetryStatusReporter(reporter TelemetryStatusReporter) {
e.telemetryStatusReporter = reporter
e.telemetryStatusReporter.Store(&reporter)
}

// SetTesting Knobs will set the pointer to the testing knobs.
Expand All @@ -155,17 +166,34 @@ func (e *Enforcer) GetTestingKnobs() *TestingKnobs {
func (e *Enforcer) Start(
ctx context.Context, st *cluster.Settings, db isql.DB, initialStart bool,
) error {
// We always start disabled. If an error occurs, the enforcer setup will be
// incomplete, but the server will continue to start. To ensure stability in
// that case, we leave throttling disabled.
e.isDisabled.Store(true)
startDisabled := e.getInitialIsDisabledValue()

e.maybeLogActiveOverrides(ctx)

if !startDisabled {
if err := e.maybeWriteClusterInitGracePeriodTS(ctx, db, initialStart); err != nil {
return err
}
}

// Initialize assuming there is no license. This seeds necessary values. It
// must be done after setting the cluster init grace period timestamp. And it
// is needed for testing that may be running this in isolation to the license
// ccl package.
e.RefreshForLicenseChange(ctx, LicTypeNone, time.Time{})

// Add a hook into the license setting so that we refresh our state whenever
// the license changes.
RegisterCallbackOnLicenseChange(ctx, st)
// the license changes. This will also update the state for the current
// license if not in test.
RegisterCallbackOnLicenseChange(ctx, st, e)

// This should be the final step after all error checks are completed.
e.isDisabled.Store(startDisabled)

// Writing the grace period initialization timestamp is currently opt-in. See
// the EnableGracePeriodInitTSWrite comment for details.
if tk := e.GetTestingKnobs(); tk != nil && tk.EnableGracePeriodInitTSWrite {
return e.maybeWriteClusterInitGracePeriodTS(ctx, db, initialStart)
}
return nil
}

Expand All @@ -192,6 +220,7 @@ func (e *Enforcer) maybeWriteClusterInitGracePeriodTS(
if initialStart {
gracePeriodLength = 7 * 24 * time.Hour
}
gracePeriodLength = e.getGracePeriodDuration(gracePeriodLength) // Allow the value to be shortened by env var
end := e.getStartTime().Add(gracePeriodLength)
log.Infof(ctx, "generated new cluster init grace period end time: %s", end.UTC().String())
e.clusterInitGracePeriodEndTS.Store(end.Unix())
Expand Down Expand Up @@ -237,14 +266,15 @@ func (e *Enforcer) GetGracePeriodEndTS() (time.Time, bool) {
// GetTelemetryDeadline returns a timestamp of when telemetry
// data needs to be received before we start to throttle. If the license doesn't
// require telemetry, then false is returned for second return value.
func (e *Enforcer) GetTelemetryDeadline() (time.Time, bool) {
if !e.licenseRequiresTelemetry.Load() || e.telemetryStatusReporter == nil {
return time.Time{}, false
func (e *Enforcer) GetTelemetryDeadline() (deadline, lastPing time.Time, ok bool) {
if !e.licenseRequiresTelemetry.Load() || e.telemetryStatusReporter.Load() == nil {
return time.Time{}, time.Time{}, false
}

lastTelemetryDataReceived := e.telemetryStatusReporter.GetLastSuccessfulTelemetryPing()
ptr := e.telemetryStatusReporter.Load()
lastTelemetryDataReceived := (*ptr).GetLastSuccessfulTelemetryPing()
throttleTS := lastTelemetryDataReceived.Add(e.getMaxTelemetryInterval())
return throttleTS, true
return throttleTS, lastTelemetryDataReceived, true
}

// MaybeFailIfThrottled evaluates the current transaction count and license state,
Expand All @@ -254,7 +284,7 @@ func (e *Enforcer) GetTelemetryDeadline() (time.Time, bool) {
func (e *Enforcer) MaybeFailIfThrottled(ctx context.Context, txnsOpened int64) (err error) {
// Early out if the number of transactions is below the max allowed or
// everything has been disabled.
if txnsOpened < e.getMaxOpenTransactions() || e.isDisabled.Load() {
if txnsOpened <= e.getMaxOpenTransactions() || e.isDisabled.Load() {
return
}

Expand All @@ -274,16 +304,15 @@ func (e *Enforcer) MaybeFailIfThrottled(ctx context.Context, txnsOpened int64) (
return
}

if ts, ok := e.GetTelemetryDeadline(); ok && now.After(ts) {
if deadlineTS, lastPingTS, ok := e.GetTelemetryDeadline(); ok && now.After(deadlineTS) {
err = errors.WithHintf(pgerror.Newf(pgcode.CCLValidLicenseRequired,
"The maximum number of open transactions has been reached because the license requires "+
"diagnostic reporting, but none has been received by Cockroach Labs."),
"Ensure diagnostic reporting is enabled and verify that nothing is blocking network access to the "+
"Cockroach Labs reporting server. You can also consider changing your license to one that doesn't "+
"require diagnostic reporting to be emitted.")
e.maybeLogError(ctx, err, &e.lastTelemetryThrottlingLogTime,
fmt.Sprintf("due to no telemetry data received, last received at %s",
e.telemetryStatusReporter.GetLastSuccessfulTelemetryPing()))
fmt.Sprintf("due to no telemetry data received, last received at %s", lastPingTS))
return
}
return
Expand All @@ -293,7 +322,9 @@ func (e *Enforcer) MaybeFailIfThrottled(ctx context.Context, txnsOpened int64) (
// information to optimize enforcement. Instead of reading the license from the
// settings, unmarshaling it, and checking its type and expiry each time,
// caching the information improves efficiency since licenses change infrequently.
func (e *Enforcer) RefreshForLicenseChange(licType LicType, licenseExpiry time.Time) {
func (e *Enforcer) RefreshForLicenseChange(
ctx context.Context, licType LicType, licenseExpiry time.Time,
) {
e.hasLicense.Store(licType != LicTypeNone)

switch licType {
Expand All @@ -313,14 +344,19 @@ func (e *Enforcer) RefreshForLicenseChange(licType LicType, licenseExpiry time.T
e.storeNewGracePeriodEndDate(timeutil.UnixEpoch, 0)
e.licenseRequiresTelemetry.Store(false)
}

gpEnd, _ := e.GetGracePeriodEndTS()
log.Infof(ctx, "enforcer license updated: grace period ends at %q, telemetry required: %t",
gpEnd, e.licenseRequiresTelemetry.Load())
}

// Disable turns off all license enforcement for the lifetime of this object.
func (e *Enforcer) Disable(ctx context.Context) {
// We provide an override so that we can continue to test license enforcement
// policies in single-node clusters.
skipDisable := envutil.EnvOrDefaultBool("COCKROACH_SKIP_LICENSE_ENFORCEMENT_DISABLE", false)
if skipDisable {
tk := e.GetTestingKnobs()
if skipDisable || (tk != nil && tk.SkipDisable) {
return
}
log.Infof(ctx, "disable all license enforcement")
Expand Down Expand Up @@ -366,6 +402,9 @@ func (e *Enforcer) getGracePeriodDuration(defaultAndMaxLength time.Duration) tim
// throttling takes affect.
func (e *Enforcer) getMaxOpenTransactions() int64 {
newLimit := envutil.EnvOrDefaultInt64("COCKROACH_MAX_OPEN_TXNS_DURING_THROTTLE", defaultMaxOpenTransactions)
if tk := e.GetTestingKnobs(); tk != nil && tk.OverrideMaxOpenTransactions != nil {
newLimit = *tk.OverrideMaxOpenTransactions
}
// Ensure we can never increase the number of open transactions allowed.
if newLimit > defaultMaxOpenTransactions {
return defaultMaxOpenTransactions
Expand Down Expand Up @@ -422,3 +461,18 @@ func (e *Enforcer) maybeLogActiveOverrides(ctx context.Context) {
log.Infof(ctx, "max telemetry interval has changed to %v", curTelemetryInterval)
}
}

// getInitialIsDisabledValue returns bool indicating what the initial value
// should be for e.isDisabled
func (e *Enforcer) getInitialIsDisabledValue() bool {
// The enforcer is currently opt-in. This will change as we approach the
// final stages of CockroachDB core license deprecation.
// TODO(spilchen): Enable the enforcer by default in CRDB-41758.
tk := e.GetTestingKnobs()
if tk == nil {
// TODO(spilchen): In CRDB-41758, remove the use of an environment variable
// as we want to avoid providing an easy way to disable the enforcer.
return !envutil.EnvOrDefaultBool("COCKROACH_ENABLE_LICENSE_ENFORCER", false)
}
return !tk.Enable
}
Loading

0 comments on commit 67e99eb

Please sign in to comment.