From 9987c5f7143ca2647b86ddbbec28b4e1fced9437 Mon Sep 17 00:00:00 2001 From: Matt Spilchen Date: Wed, 11 Sep 2024 13:59:56 -0300 Subject: [PATCH] server/license: Integrate license throttle with SQL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this, the throttling code had been implemented but wasn’t integrated with SQL. This change completes that integration, applying throttling only to external SQL queries—internal SQL queries are exempt. If throttling is active, an error is returned and propagated to the caller. For licenses requiring telemetry, the latest telemetry ping time is used in the throttling calculation by implementing the license.TelemetryStatusReporter interface. Most throttling functionality remains disabled, enabled only in unit tests or via environment variables. Full throttling will be enabled in CRDB-41758. Epic: CRDB-39988 Closes: CRDB-39991 Release note: None --- pkg/ccl/utilccl/license_check.go | 6 +- pkg/server/diagnostics/reporter.go | 7 ++ pkg/server/license/BUILD.bazel | 3 + pkg/server/license/cclbridge.go | 2 +- pkg/server/license/enforcer.go | 110 ++++++++++++++----- pkg/server/license/enforcer_test.go | 145 ++++++++++++++++++++++--- pkg/server/server_sql.go | 4 +- pkg/sql/BUILD.bazel | 1 + pkg/sql/conn_executor_exec.go | 37 ++++--- pkg/sql/conn_executor_internal_test.go | 2 + 10 files changed, 257 insertions(+), 60 deletions(-) diff --git a/pkg/ccl/utilccl/license_check.go b/pkg/ccl/utilccl/license_check.go index f4223bc0aa50..e65f462d1d27 100644 --- a/pkg/ccl/utilccl/license_check.go +++ b/pkg/ccl/utilccl/license_check.go @@ -311,7 +311,9 @@ func check(l *licenseccl.License, at time.Time, org, feature string, withDetails // RegisterCallbackOnLicenseChange will register a callback to update the // license enforcer whenever the license changes. -func RegisterCallbackOnLicenseChange(ctx context.Context, st *cluster.Settings) { +func RegisterCallbackOnLicenseChange( + ctx context.Context, st *cluster.Settings, licenseEnforcer *licenseserver.Enforcer, +) { refreshFunc := func(ctx context.Context) { lic, err := getLicense(st) if err != nil { @@ -335,7 +337,7 @@ func RegisterCallbackOnLicenseChange(ctx context.Context, st *cluster.Settings) licenseType = licenseserver.LicTypeEnterprise } } - licenseserver.GetEnforcerInstance().RefreshForLicenseChange(licenseType, licenseExpiry) + licenseEnforcer.RefreshForLicenseChange(ctx, licenseType, licenseExpiry) } // Install the hook so that we refresh license details when the license changes. enterpriseLicense.SetOnChange(&st.SV, refreshFunc) diff --git a/pkg/server/diagnostics/reporter.go b/pkg/server/diagnostics/reporter.go index 4905d11a2149..033c7ddbd84f 100644 --- a/pkg/server/diagnostics/reporter.go +++ b/pkg/server/diagnostics/reporter.go @@ -187,6 +187,13 @@ func (r *Reporter) ReportDiagnostics(ctx context.Context) { r.SQLServer.GetReportedSQLStatsController().ResetLocalSQLStats(ctx) } +// GetLastSuccessfulTelemetryPing will return the timestamp of when we last got +// a ping back from the registration server. +func (r *Reporter) GetLastSuccessfulTelemetryPing() time.Time { + ts := timeutil.Unix(r.LastSuccessfulTelemetryPing.Load(), 0) + return ts +} + // CreateReport generates a new diagnostics report containing information about // the current node or tenant. func (r *Reporter) CreateReport( diff --git a/pkg/server/license/BUILD.bazel b/pkg/server/license/BUILD.bazel index 6a771af2f624..3d7c8a6f6aad 100644 --- a/pkg/server/license/BUILD.bazel +++ b/pkg/server/license/BUILD.bazel @@ -36,7 +36,10 @@ go_test( "//pkg/server", "//pkg/sql", "//pkg/sql/catalog/descs", + "//pkg/sql/isql", + "//pkg/sql/sessiondata", "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/server/license/cclbridge.go b/pkg/server/license/cclbridge.go index 72b8f8dc40eb..0c4a36e02a6a 100644 --- a/pkg/server/license/cclbridge.go +++ b/pkg/server/license/cclbridge.go @@ -23,7 +23,7 @@ import ( // RegisterCallbackOnLicenseChange is a pointer to a function that will register // a callback when the license changes. This is initially empty here. When // initializing the ccl package, this variable will be set to a valid function. -var RegisterCallbackOnLicenseChange = func(ctx context.Context, st *cluster.Settings) {} +var RegisterCallbackOnLicenseChange = func(context.Context, *cluster.Settings, *Enforcer) {} // LicType is the type to define the license type, as needed by the license // enforcer. diff --git a/pkg/server/license/enforcer.go b/pkg/server/license/enforcer.go index 8c51c09931b3..69dac7e65807 100644 --- a/pkg/server/license/enforcer.go +++ b/pkg/server/license/enforcer.go @@ -45,7 +45,7 @@ type Enforcer struct { // telemetryStatusReporter is an interface for getting the timestamp of the // last successful ping to the telemetry server. For some licenses, sending // telemetry data is required to avoid throttling. - telemetryStatusReporter TelemetryStatusReporter + telemetryStatusReporter atomic.Pointer[TelemetryStatusReporter] // clusterInitGracePeriodEndTS marks the end of the grace period when a // license is required. It is set during the cluster's initial startup. The @@ -86,13 +86,18 @@ type Enforcer struct { } type TestingKnobs struct { - // EnableGracePeriodInitTSWrite is a control knob for writing the grace period - // initialization timestamp. It is currently set to opt-in for writing the - // timestamp as a way to stage these changes. This ensures that the timestamp - // isn't written before the other license enforcement changes are complete. - // TODO(spilchen): Change this knob to opt-out as we approach the final stages - // of the core licensing deprecation work. This will be handled in CRDB-41758. - EnableGracePeriodInitTSWrite bool + // Enable controls whether the enforcer writes the grace period end time to KV + // and performs throttle checks. This is currently opt-in to allow for a gradual + // rollout of these changes. It will be removed or changed to opt-out as we near + // the final stages of the CockroachDB core licensing deprecation. + // TODO(spilchen): Update or remove this knob closer to the completion of the + // core licensing deprecation work (CRDB-41758). + Enable bool + + // SkipDisable makes the Disable() function a no-op. This is separate from Enable + // because we perform additional checks during server startup that may automatically + // disable enforcement based on configuration (e.g., for single-node instances). + SkipDisable bool // OverrideStartTime if set, overrides the time that's used to seed the // grace period init timestamp. @@ -101,6 +106,10 @@ type TestingKnobs struct { // OverrideThrottleCheckTime if set, overrides the timestamp used when // checking if throttling is active. OverrideThrottleCheckTime *time.Time + + // OverrideMaxOpenTransactions if set, overrides the maximum open transactions + // when checking if active throttling. + OverrideMaxOpenTransactions *int64 } // TelemetryStatusReporter is the interface we use to find the last ping @@ -126,14 +135,16 @@ func GetEnforcerInstance() *Enforcer { // newEnforcer creates a new Enforcer object. func newEnforcer() *Enforcer { - return &Enforcer{ + e := &Enforcer{ startTime: timeutil.Now(), } + e.isDisabled.Store(true) // Start disabled until Start() is called + return e } // SetTelemetryStatusReporter will set the pointer to the telemetry status reporter. func (e *Enforcer) SetTelemetryStatusReporter(reporter TelemetryStatusReporter) { - e.telemetryStatusReporter = reporter + e.telemetryStatusReporter.Store(&reporter) } // SetTesting Knobs will set the pointer to the testing knobs. @@ -155,17 +166,34 @@ func (e *Enforcer) GetTestingKnobs() *TestingKnobs { func (e *Enforcer) Start( ctx context.Context, st *cluster.Settings, db isql.DB, initialStart bool, ) error { + // We always start disabled. If an error occurs, the enforcer setup will be + // incomplete, but the server will continue to start. To ensure stability in + // that case, we leave throttling disabled. + e.isDisabled.Store(true) + startDisabled := e.getInitialIsDisabledValue() + e.maybeLogActiveOverrides(ctx) + if !startDisabled { + if err := e.maybeWriteClusterInitGracePeriodTS(ctx, db, initialStart); err != nil { + return err + } + } + + // Initialize assuming there is no license. This seeds necessary values. It + // must be done after setting the cluster init grace period timestamp. And it + // is needed for testing that may be running this in isolation to the license + // ccl package. + e.RefreshForLicenseChange(ctx, LicTypeNone, time.Time{}) + // Add a hook into the license setting so that we refresh our state whenever - // the license changes. - RegisterCallbackOnLicenseChange(ctx, st) + // the license changes. This will also update the state for the current + // license if not in test. + RegisterCallbackOnLicenseChange(ctx, st, e) + + // This should be the final step after all error checks are completed. + e.isDisabled.Store(startDisabled) - // Writing the grace period initialization timestamp is currently opt-in. See - // the EnableGracePeriodInitTSWrite comment for details. - if tk := e.GetTestingKnobs(); tk != nil && tk.EnableGracePeriodInitTSWrite { - return e.maybeWriteClusterInitGracePeriodTS(ctx, db, initialStart) - } return nil } @@ -192,6 +220,7 @@ func (e *Enforcer) maybeWriteClusterInitGracePeriodTS( if initialStart { gracePeriodLength = 7 * 24 * time.Hour } + gracePeriodLength = e.getGracePeriodDuration(gracePeriodLength) // Allow the value to be shortened by env var end := e.getStartTime().Add(gracePeriodLength) log.Infof(ctx, "generated new cluster init grace period end time: %s", end.UTC().String()) e.clusterInitGracePeriodEndTS.Store(end.Unix()) @@ -237,14 +266,15 @@ func (e *Enforcer) GetGracePeriodEndTS() (time.Time, bool) { // GetTelemetryDeadline returns a timestamp of when telemetry // data needs to be received before we start to throttle. If the license doesn't // require telemetry, then false is returned for second return value. -func (e *Enforcer) GetTelemetryDeadline() (time.Time, bool) { - if !e.licenseRequiresTelemetry.Load() || e.telemetryStatusReporter == nil { - return time.Time{}, false +func (e *Enforcer) GetTelemetryDeadline() (deadline, lastPing time.Time, ok bool) { + if !e.licenseRequiresTelemetry.Load() || e.telemetryStatusReporter.Load() == nil { + return time.Time{}, time.Time{}, false } - lastTelemetryDataReceived := e.telemetryStatusReporter.GetLastSuccessfulTelemetryPing() + ptr := e.telemetryStatusReporter.Load() + lastTelemetryDataReceived := (*ptr).GetLastSuccessfulTelemetryPing() throttleTS := lastTelemetryDataReceived.Add(e.getMaxTelemetryInterval()) - return throttleTS, true + return throttleTS, lastTelemetryDataReceived, true } // MaybeFailIfThrottled evaluates the current transaction count and license state, @@ -254,7 +284,7 @@ func (e *Enforcer) GetTelemetryDeadline() (time.Time, bool) { func (e *Enforcer) MaybeFailIfThrottled(ctx context.Context, txnsOpened int64) (err error) { // Early out if the number of transactions is below the max allowed or // everything has been disabled. - if txnsOpened < e.getMaxOpenTransactions() || e.isDisabled.Load() { + if txnsOpened <= e.getMaxOpenTransactions() || e.isDisabled.Load() { return } @@ -274,7 +304,7 @@ func (e *Enforcer) MaybeFailIfThrottled(ctx context.Context, txnsOpened int64) ( return } - if ts, ok := e.GetTelemetryDeadline(); ok && now.After(ts) { + if deadlineTS, lastPingTS, ok := e.GetTelemetryDeadline(); ok && now.After(deadlineTS) { err = errors.WithHintf(pgerror.Newf(pgcode.CCLValidLicenseRequired, "The maximum number of open transactions has been reached because the license requires "+ "diagnostic reporting, but none has been received by Cockroach Labs."), @@ -282,8 +312,7 @@ func (e *Enforcer) MaybeFailIfThrottled(ctx context.Context, txnsOpened int64) ( "Cockroach Labs reporting server. You can also consider changing your license to one that doesn't "+ "require diagnostic reporting to be emitted.") e.maybeLogError(ctx, err, &e.lastTelemetryThrottlingLogTime, - fmt.Sprintf("due to no telemetry data received, last received at %s", - e.telemetryStatusReporter.GetLastSuccessfulTelemetryPing())) + fmt.Sprintf("due to no telemetry data received, last received at %s", lastPingTS)) return } return @@ -293,7 +322,9 @@ func (e *Enforcer) MaybeFailIfThrottled(ctx context.Context, txnsOpened int64) ( // information to optimize enforcement. Instead of reading the license from the // settings, unmarshaling it, and checking its type and expiry each time, // caching the information improves efficiency since licenses change infrequently. -func (e *Enforcer) RefreshForLicenseChange(licType LicType, licenseExpiry time.Time) { +func (e *Enforcer) RefreshForLicenseChange( + ctx context.Context, licType LicType, licenseExpiry time.Time, +) { e.hasLicense.Store(licType != LicTypeNone) switch licType { @@ -313,6 +344,10 @@ func (e *Enforcer) RefreshForLicenseChange(licType LicType, licenseExpiry time.T e.storeNewGracePeriodEndDate(timeutil.UnixEpoch, 0) e.licenseRequiresTelemetry.Store(false) } + + gpEnd, _ := e.GetGracePeriodEndTS() + log.Infof(ctx, "enforcer license updated: grace period ends at %q, telemetry required: %t", + gpEnd, e.licenseRequiresTelemetry.Load()) } // Disable turns off all license enforcement for the lifetime of this object. @@ -320,7 +355,8 @@ func (e *Enforcer) Disable(ctx context.Context) { // We provide an override so that we can continue to test license enforcement // policies in single-node clusters. skipDisable := envutil.EnvOrDefaultBool("COCKROACH_SKIP_LICENSE_ENFORCEMENT_DISABLE", false) - if skipDisable { + tk := e.GetTestingKnobs() + if skipDisable || (tk != nil && tk.SkipDisable) { return } log.Infof(ctx, "disable all license enforcement") @@ -366,6 +402,9 @@ func (e *Enforcer) getGracePeriodDuration(defaultAndMaxLength time.Duration) tim // throttling takes affect. func (e *Enforcer) getMaxOpenTransactions() int64 { newLimit := envutil.EnvOrDefaultInt64("COCKROACH_MAX_OPEN_TXNS_DURING_THROTTLE", defaultMaxOpenTransactions) + if tk := e.GetTestingKnobs(); tk != nil && tk.OverrideMaxOpenTransactions != nil { + newLimit = *tk.OverrideMaxOpenTransactions + } // Ensure we can never increase the number of open transactions allowed. if newLimit > defaultMaxOpenTransactions { return defaultMaxOpenTransactions @@ -422,3 +461,18 @@ func (e *Enforcer) maybeLogActiveOverrides(ctx context.Context) { log.Infof(ctx, "max telemetry interval has changed to %v", curTelemetryInterval) } } + +// getInitialIsDisabledValue returns bool indicating what the initial value +// should be for e.isDisabled +func (e *Enforcer) getInitialIsDisabledValue() bool { + // The enforcer is currently opt-in. This will change as we approach the + // final stages of CockroachDB core license deprecation. + // TODO(spilchen): Enable the enforcer by default in CRDB-41758. + tk := e.GetTestingKnobs() + if tk == nil { + // TODO(spilchen): In CRDB-41758, remove the use of an environment variable + // as we want to avoid providing an easy way to disable the enforcer. + return !envutil.EnvOrDefaultBool("COCKROACH_ENABLE_LICENSE_ENFORCER", false) + } + return !tk.Enable +} diff --git a/pkg/server/license/enforcer_test.go b/pkg/server/license/enforcer_test.go index aacf38e2dc2c..eec6317842d6 100644 --- a/pkg/server/license/enforcer_test.go +++ b/pkg/server/license/enforcer_test.go @@ -22,7 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/license" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -51,8 +54,8 @@ func TestGracePeriodInitTSCache(t *testing.T) { Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ LicenseTestingKnobs: license.TestingKnobs{ - EnableGracePeriodInitTSWrite: true, - OverrideStartTime: &ts1, + Enable: true, + OverrideStartTime: &ts1, }, }, }, @@ -65,8 +68,8 @@ func TestGracePeriodInitTSCache(t *testing.T) { ts2 := ts1.Add(1) ts2End := ts2.Add(7 * 24 * time.Hour) // Calculate the end of the grace period enforcer.SetTestingKnobs(&license.TestingKnobs{ - EnableGracePeriodInitTSWrite: true, - OverrideStartTime: &ts2, + Enable: true, + OverrideStartTime: &ts2, }) // Ensure request for the grace period init ts1 before start just returns the start // time used when the enforcer was created. @@ -115,36 +118,36 @@ func TestThrottle(t *testing.T) { }{ // Expired free license but under the transaction threshold {UnderTxnThreshold, license.LicTypeFree, t0, t1d, t8d, t45d, ""}, - // Expired trial license but under the transaction threshold - {UnderTxnThreshold, license.LicTypeTrial, t0, t30d, t8d, t45d, ""}, + // Expired trial license but at the transaction threshold + {AtTxnThreshold, license.LicTypeTrial, t0, t30d, t8d, t45d, ""}, // Over the transaction threshold but not expired {OverTxnThreshold, license.LicTypeFree, t0, t10d, t45d, t10d, ""}, // Expired free license, past the grace period - {AtTxnThreshold, license.LicTypeFree, t0, t30d, t10d, t45d, "License expired"}, + {OverTxnThreshold, license.LicTypeFree, t0, t30d, t10d, t45d, "License expired"}, // Expired free license, but not past the grace period {OverTxnThreshold, license.LicTypeFree, t0, t30d, t10d, t17d, ""}, // Valid free license, but telemetry ping hasn't been received in 7 days. - {AtTxnThreshold, license.LicTypeFree, t0, t10d, t45d, t17d, ""}, + {OverTxnThreshold, license.LicTypeFree, t0, t10d, t45d, t17d, ""}, // Valid free license, but telemetry ping hasn't been received in 8 days. {OverTxnThreshold, license.LicTypeFree, t0, t10d, t45d, t18d, "diagnostic reporting"}, // No license but within grace period still - {AtTxnThreshold, license.LicTypeNone, t0, t0, t0, t1d, ""}, + {OverTxnThreshold, license.LicTypeNone, t0, t0, t0, t1d, ""}, // No license but beyond grace period {OverTxnThreshold, license.LicTypeNone, t0, t0, t0, t8d, "No license installed"}, // Trial license has expired but still within grace period - {AtTxnThreshold, license.LicTypeTrial, t0, t30d, t10d, t15d, ""}, + {OverTxnThreshold, license.LicTypeTrial, t0, t30d, t10d, t15d, ""}, // Trial license has expired and just at the edge of the grace period. {OverTxnThreshold, license.LicTypeTrial, t0, t45d, t10d, t17d, ""}, // Trial license has expired and just beyond the grace period. - {AtTxnThreshold, license.LicTypeTrial, t0, t45d, t10d, t18d, "License expired"}, + {OverTxnThreshold, license.LicTypeTrial, t0, t45d, t10d, t18d, "License expired"}, // No throttling if past the expiry of an enterprise license {OverTxnThreshold, license.LicTypeEnterprise, t0, t0, t8d, t46d, ""}, // Telemetry isn't needed for enterprise license - {AtTxnThreshold, license.LicTypeEnterprise, t0, t0, t45d, t30d, ""}, + {OverTxnThreshold, license.LicTypeEnterprise, t0, t0, t45d, t30d, ""}, // Telemetry isn't needed for evaluation license {OverTxnThreshold, license.LicTypeEvaluation, t0, t0, t45d, t30d, ""}, // Evaluation license doesn't throttle if expired but within grace period. - {AtTxnThreshold, license.LicTypeEvaluation, t0, t0, t15d, t30d, ""}, + {OverTxnThreshold, license.LicTypeEvaluation, t0, t0, t15d, t30d, ""}, // Evaluation license does throttle if expired and beyond grace period. {OverTxnThreshold, license.LicTypeEvaluation, t0, t0, t15d, t46d, "License expired"}, } { @@ -157,7 +160,7 @@ func TestThrottle(t *testing.T) { e.SetTelemetryStatusReporter(&mockTelemetryStatusReporter{ lastPingTime: tc.lastTelemetryPingTime, }) - e.RefreshForLicenseChange(tc.licType, tc.licExpiry) + e.RefreshForLicenseChange(ctx, tc.licType, tc.licExpiry) err := e.MaybeFailIfThrottled(ctx, tc.openTxnsCount) if tc.expectedErrRegex == "" { require.NoError(t, err) @@ -171,3 +174,117 @@ func TestThrottle(t *testing.T) { }) } } + +func TestThrottleErrorMsg(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set of times that we'll use in the test. + // + // Initial time that other timestamps are derived from + t0d := timeutil.Unix(1724329716, 0) + // 10 days after start. License is valid. + t10d := t0d.Add(10 * 24 * time.Hour) + // 30 days after start. This is when the license will expire. + t30d := t0d.Add(30 * 24 * time.Hour) + // 55 days after initial time. This is still within grace period. + t55d := t0d.Add(55 * 24 * time.Hour) + // 60 days after initial time. This is the end of the grace period. Throttling + // may happen anytime after this. + t60d := t0d.Add(60 * 24 * time.Hour) + // 1ms past the grace period end time. + t60d1ms := t60d.Add(time.Millisecond) + + // Pointer to the timestamp that we'll use for the throttle check. This is + // modified for every test unit. + throttleCheckTS := &time.Time{} + + // Controls the maximum number of open transactions to simulate concurrency. + // This value can be modified by individual tests through testing knobs based + // on whether the open transactions are above or below the threshold. + var maxOpenTransactions int64 = 5 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + LicenseTestingKnobs: license.TestingKnobs{ + Enable: true, + // The mock server we bring up is single-node, which disables all + // throttling checks. We need to avoid that for this test to verify + // the throttle message. + SkipDisable: true, + // We are going to modify the throttle check timestamp in each test + // unit. + OverrideThrottleCheckTime: throttleCheckTS, + // And we will modify what is the max open transactions to force us + // over the limit. + OverrideMaxOpenTransactions: &maxOpenTransactions, + }, + }, + }, + }) + defer srv.Stopper().Stop(ctx) + + // Set up a free license that will expire in 30 days + licenseEnforcer := srv.SystemLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer + licenseEnforcer.RefreshForLicenseChange(ctx, license.LicTypeFree, t30d) + + for _, tc := range []struct { + desc string + // overThreshold will control if we are above the limit of max open transactions. + overThreshold bool + throttleCheckTS time.Time + telemetryTS time.Time + errRE string + }{ + // NB: license expires at t30d and grace period ends at t60d. + {"at-threshold-valid-license-and-telemetry", true, t10d, t10d, ""}, + {"at-threshold-expired-license-in-grace-valid-telemetry", true, t60d, t55d, ""}, + {"at-threshold-expired-license-past-grace-valid-telemetry", true, t60d1ms, t55d, + "License expired. The maximum number of open transactions has been reached"}, + {"below-threshold-expired-license-past-grace-valid-telemetry", false, t60d1ms, t55d, ""}, + {"at-threshold-expired-license-in-grace-invalid-telemetry", true, t55d, t10d, + "The maximum number of open transactions has been reached because the license requires diagnostic reporting"}, + {"at-threshold-valid-license-invalid-telemetry", true, t10d, t0d, + "The maximum number of open transactions has been reached because the license requires diagnostic reporting"}, + {"below-threshold-invalid-telemetry", false, t10d, t0d, ""}, + } { + t.Run(tc.desc, func(t *testing.T) { + // Adjust the throttle check time for this test unit + *throttleCheckTS = tc.throttleCheckTS + + // Override the telemetry server so we have control of what the last ping + // time was. + licenseEnforcer.SetTelemetryStatusReporter(&mockTelemetryStatusReporter{lastPingTime: tc.telemetryTS}) + + // Override the max open transactions based on whether we are above or + // below the open transaction limit. + if tc.overThreshold { + maxOpenTransactions = 0 + } else { + maxOpenTransactions = 5 + } + + // Open a transaction. First, check external connections. + // It may or may not be throttled, depending on tc parms. + tdb := sqlutils.MakeSQLRunner(sqlDB) + if tc.errRE != "" { + tdb.ExpectErr(t, tc.errRE, "SELECT 1") + } else { + tdb.Exec(t, "SELECT 1") + } + + // Confirm that internal connections are never throttled + idb := srv.InternalDB().(isql.DB) + err := idb.Txn(ctx, func(ctx context.Context, tx isql.Txn) error { + _, err := tx.ExecEx(ctx, "internal query throttle test", tx.KV(), + sessiondata.NodeUserSessionDataOverride, "SELECT 1") + return err + }) + require.NoError(t, err) + }) + } +} diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 93061c1fdbf4..2f3cebbb6f5b 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1854,14 +1854,16 @@ func (s *SQLServer) startLicenseEnforcer( // it requires access to the system keyspace. For secondary tenants, this struct // is shared to provide access to the values cached from the KV read. if s.execCfg.Codec.ForSystemTenant() { + licenseEnforcer := s.execCfg.LicenseEnforcer if knobs.Server != nil { s.execCfg.LicenseEnforcer.SetTestingKnobs(&knobs.Server.(*TestingKnobs).LicenseTestingKnobs) } + licenseEnforcer.SetTelemetryStatusReporter(s.diagnosticsReporter) // TODO(spilchen): we need to tell the license enforcer about the // diagnostics reporter. This will be handled in CRDB-39991 err := startup.RunIdempotentWithRetry(ctx, s.stopper.ShouldQuiesce(), "license enforcer start", func(ctx context.Context) error { - return s.execCfg.LicenseEnforcer.Start(ctx, s.cfg.Settings, s.internalDB, initialStart) + return licenseEnforcer.Start(ctx, s.cfg.Settings, s.internalDB, initialStart) }) // This is not a critical component. If it fails to start, we log a warning // rather than prevent the entire server from starting. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 1b10c21c823b..31a5b9a1ebd0 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -795,6 +795,7 @@ go_test( "//pkg/security/securitytest", "//pkg/security/username", "//pkg/server", + "//pkg/server/license", "//pkg/server/serverpb", "//pkg/server/settingswatcher", "//pkg/server/srvtestutils", diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index a59488fc962c..0a5101d79d98 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -560,25 +560,34 @@ func (ex *connExecutor) execStmtInOpenState( p.noticeSender = res ih := &p.instrumentation - if maxOpen := maxOpenTransactions.Get(&ex.server.cfg.Settings.SV); maxOpen > 0 && ex.executorType != executorTypeInternal { + if ex.executorType != executorTypeInternal { // NB: ex.metrics includes internal executor transactions when executorType // is executorTypeInternal, so that's why we exclude internal executors // in the conditional. - if ex.metrics.EngineMetrics.SQLTxnsOpen.Value() > maxOpen { - hasAdmin, err := ex.planner.HasAdminRole(ctx) - if err != nil { - return makeErrEvent(err) - } - if !hasAdmin { - return makeErrEvent(errors.WithHintf( - pgerror.Newf( - pgcode.ConfigurationLimitExceeded, - "cannot execute operation due to server.max_open_transactions_per_gateway cluster setting", - ), - "the maximum number of open transactions is %d", maxOpen, - )) + curOpen := ex.metrics.EngineMetrics.SQLTxnsOpen.Value() + if maxOpen := maxOpenTransactions.Get(&ex.server.cfg.Settings.SV); maxOpen > 0 { + if curOpen > maxOpen { + hasAdmin, err := ex.planner.HasAdminRole(ctx) + if err != nil { + return makeErrEvent(err) + } + if !hasAdmin { + return makeErrEvent(errors.WithHintf( + pgerror.Newf( + pgcode.ConfigurationLimitExceeded, + "cannot execute operation due to server.max_open_transactions_per_gateway cluster setting", + ), + "the maximum number of open transactions is %d", maxOpen, + )) + } } } + + // Enforce license policies. Throttling can occur if there is no valid + // license or if it has expired. + if err := ex.server.cfg.LicenseEnforcer.MaybeFailIfThrottled(ctx, curOpen); err != nil { + return makeErrEvent(err) + } } // Special top-level handling for EXPLAIN ANALYZE. diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 8db040769fcd..a7e4d53c7193 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/license" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" @@ -345,6 +346,7 @@ func startConnExecutor( StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, st), HistogramWindowInterval: base.DefaultHistogramWindowInterval(), CollectionFactory: collectionFactory, + LicenseEnforcer: license.GetEnforcerInstance(), } s := NewServer(cfg, pool)