diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 65983ec52db7..4383825bf8d4 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -163,7 +163,7 @@ func (d *datadrivenTestState) addServer( return errors.New("unable to parse tempCleanupFrequency during server creation") } settings := cluster.MakeTestingClusterSettings() - sql.TempObjectCleanupInterval.Override(&settings.SV, duration) + sql.TempObjectCleanupInterval.Override(context.Background(), &settings.SV, duration) params.ServerArgs.Settings = settings } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 99efe6cb5af8..c4f7880f0a76 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -48,6 +48,7 @@ import ( func TestMaxImportBatchSize(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() testCases := []struct { importBatchSize int64 @@ -61,8 +62,8 @@ func TestMaxImportBatchSize(t *testing.T) { } for i, testCase := range testCases { st := cluster.MakeTestingClusterSettings() - storageccl.ImportBatchSize.Override(&st.SV, testCase.importBatchSize) - kvserver.MaxCommandSize.Override(&st.SV, testCase.maxCommandSize) + storageccl.ImportBatchSize.Override(ctx, &st.SV, testCase.importBatchSize) + kvserver.MaxCommandSize.Override(ctx, &st.SV, testCase.maxCommandSize) if e, a := storageccl.MaxImportBatchSize(st), testCase.expected; e != a { t.Errorf("%d: expected max batch size %d, but got %d", i, e, a) } @@ -163,6 +164,7 @@ func clientKVsToEngineKVs(kvs []kv.KeyValue) []storage.MVCCKeyValue { func TestImport(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() t.Run("batch=default", func(t *testing.T) { runTestImport(t, func(_ *cluster.Settings) {}) }) @@ -170,7 +172,7 @@ func TestImport(t *testing.T) { // The test normally doesn't trigger the batching behavior, so lower // the threshold to force it. init := func(st *cluster.Settings) { - storageccl.ImportBatchSize.Override(&st.SV, 1) + storageccl.ImportBatchSize.Override(ctx, &st.SV, 1) } runTestImport(t, init) }) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e92055285f85..340490d7189e 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3186,6 +3186,7 @@ func TestChangefeedTelemetry(t *testing.T) { func TestChangefeedMemBufferCapacityErrorRetryable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() // memLimitTest returns a test runner which starts numFeeds changefeeds, // and verifies that memory limits are honored. @@ -3230,7 +3231,7 @@ func TestChangefeedMemBufferCapacityErrorRetryable(t *testing.T) { // Each changefeed gets enough memory to work by itself, but not enough // to have all the changefeeds succeed. changefeedbase.PerChangefeedMemLimit.Override( - &ff.Server().ClusterSettings().SV, 2*mon.DefaultPoolAllocationSize) + ctx, &ff.Server().ClusterSettings().SV, 2*mon.DefaultPoolAllocationSize) // beforeEmitRowCh is used to block feeds from processing messages. // This channel is closed below to speed up test termination. diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 22f94834f8e7..dac6bd477d04 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -72,9 +72,10 @@ func TestEvalFollowerReadOffset(t *testing.T) { func TestZeroDurationDisablesFollowerReadOffset(t *testing.T) { defer leaktest.AfterTest(t)() defer utilccl.TestingEnableEnterprise()() + ctx := context.Background() st := cluster.MakeTestingClusterSettings() - closedts.TargetDuration.Override(&st.SV, 0) + closedts.TargetDuration.Override(ctx, &st.SV, 0) if offset, err := evalFollowerReadOffset(uuid.MakeV4(), st); err != nil { t.Fatal(err) } else if offset != math.MinInt64 { @@ -84,6 +85,7 @@ func TestZeroDurationDisablesFollowerReadOffset(t *testing.T) { func TestCanSendToFollower(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset) stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0) current := clock.Now() @@ -273,9 +275,9 @@ func TestCanSendToFollower(t *testing.T) { defer utilccl.TestingEnableEnterprise()() } st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + kvserver.FollowerReadsEnabled.Override(ctx, &st.SV, !c.disabledFollowerReads) if c.zeroTargetDuration { - closedts.TargetDuration.Override(&st.SV, 0) + closedts.TargetDuration.Override(ctx, &st.SV, 0) } can := canSendToFollower(uuid.MakeV4(), st, clock, c.ctPolicy, c.ba) @@ -286,13 +288,14 @@ func TestCanSendToFollower(t *testing.T) { func TestFollowerReadMultipleValidation(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() defer func() { if r := recover(); r == nil { t.Fatalf("expected panic from setting followerReadMultiple to .1") } }() st := cluster.MakeTestingClusterSettings() - followerReadMultiple.Override(&st.SV, .1) + followerReadMultiple.Override(ctx, &st.SV, .1) } // mockNodeStore implements the kvcoord.NodeDescStore interface. @@ -470,7 +473,7 @@ func TestOracle(t *testing.T) { defer utilccl.TestingEnableEnterprise()() } st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + kvserver.FollowerReadsEnabled.Override(ctx, &st.SV, !c.disabledFollowerReads) o := replicaoracle.NewOracle(followerReadOraclePolicy, replicaoracle.Config{ NodeDescs: nodes, diff --git a/pkg/ccl/oidcccl/authentication_oidc.go b/pkg/ccl/oidcccl/authentication_oidc.go index f8656dd45446..c77ced1c54e9 100644 --- a/pkg/ccl/oidcccl/authentication_oidc.go +++ b/pkg/ccl/oidcccl/authentication_oidc.go @@ -410,35 +410,35 @@ var ConfigureOIDC = func( reloadConfig(serverCtx, oidcAuthentication, locality, st) - OIDCEnabled.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCEnabled.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCClientID.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCClientID.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCClientSecret.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCClientSecret.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCRedirectURL.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCRedirectURL.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCProviderURL.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCProviderURL.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCScopes.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCScopes.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCClaimJSONKey.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCClaimJSONKey.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCPrincipalRegex.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCPrincipalRegex.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCButtonText.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCButtonText.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) - OIDCAutoLogin.SetOnChange(&st.SV, func() { - reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st) + OIDCAutoLogin.SetOnChange(&st.SV, func(ctx context.Context) { + reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st) }) return oidcAuthentication, nil diff --git a/pkg/ccl/utilccl/license_check_test.go b/pkg/ccl/utilccl/license_check_test.go index 055bfe8f8227..ae85598f2f9b 100644 --- a/pkg/ccl/utilccl/license_check_test.go +++ b/pkg/ccl/utilccl/license_check_test.go @@ -25,6 +25,7 @@ func TestSettingAndCheckingLicense(t *testing.T) { idA, _ := uuid.FromString("A0000000-0000-0000-0000-00000000000A") idB, _ := uuid.FromString("B0000000-0000-0000-0000-00000000000B") + ctx := context.Background() t0 := timeutil.Unix(0, 0) licA, _ := (&licenseccl.License{ @@ -60,7 +61,7 @@ func TestSettingAndCheckingLicense(t *testing.T) { {"", idA, t0, "requires an enterprise license"}, } { updater := st.MakeUpdater() - if err := updater.Set("enterprise.license", tc.lic, "s"); err != nil { + if err := updater.Set(ctx, "enterprise.license", tc.lic, "s"); err != nil { t.Fatal(err) } err := checkEnterpriseEnabledAt(st, tc.checkTime, tc.checkCluster, "", "", true) @@ -72,6 +73,7 @@ func TestSettingAndCheckingLicense(t *testing.T) { } func TestGetLicenseTypePresent(t *testing.T) { + ctx := context.Background() for _, tc := range []struct { licenseType licenseccl.License_Type expected string @@ -87,7 +89,7 @@ func TestGetLicenseTypePresent(t *testing.T) { Type: tc.licenseType, ValidUntilUnixSec: 0, }).Encode() - if err := updater.Set("enterprise.license", lic, "s"); err != nil { + if err := updater.Set(ctx, "enterprise.license", lic, "s"); err != nil { t.Fatal(err) } actual, err := getLicenseType(st) @@ -112,6 +114,7 @@ func TestGetLicenseTypeAbsent(t *testing.T) { } func TestSettingBadLicenseStrings(t *testing.T) { + ctx := context.Background() for _, tc := range []struct{ lic, err string }{ {"blah", "invalid license string"}, {"cl-0-blah", "invalid license string"}, @@ -119,7 +122,7 @@ func TestSettingBadLicenseStrings(t *testing.T) { st := cluster.MakeTestingClusterSettings() u := st.MakeUpdater() - if err := u.Set("enterprise.license", tc.lic, "s"); !testutils.IsError( + if err := u.Set(ctx, "enterprise.license", tc.lic, "s"); !testutils.IsError( err, tc.err, ) { t.Fatalf("%q: expected err %q, got %v", tc.lic, tc.err, err) @@ -128,6 +131,7 @@ func TestSettingBadLicenseStrings(t *testing.T) { } func TestTimeToEnterpriseLicenseExpiry(t *testing.T) { + ctx := context.Background() id, _ := uuid.FromString("A0000000-0000-0000-0000-00000000000A") t0 := timeutil.Unix(1603926294, 0) @@ -171,7 +175,7 @@ func TestTimeToEnterpriseLicenseExpiry(t *testing.T) { {"No License", "", 0}, } { t.Run(tc.desc, func(t *testing.T) { - if err := updater.Set("enterprise.license", tc.lic, "s"); err != nil { + if err := updater.Set(ctx, "enterprise.license", tc.lic, "s"); err != nil { t.Fatal(err) } diff --git a/pkg/cli/connect_join_test.go b/pkg/cli/connect_join_test.go index 773381e2f991..71af5a1f30ff 100644 --- a/pkg/cli/connect_join_test.go +++ b/pkg/cli/connect_join_test.go @@ -38,7 +38,7 @@ func TestNodeJoin(t *testing.T) { defer cancel() settings := cluster.MakeTestingClusterSettings() - sql.FeatureTLSAutoJoinEnabled.Override(&settings.SV, true) + sql.FeatureTLSAutoJoinEnabled.Override(ctx, &settings.SV, true) s, sqldb, _ := serverutils.StartServer(t, base.TestServerArgs{ Settings: settings, }) @@ -86,7 +86,7 @@ func TestNodeJoinBadToken(t *testing.T) { defer cancel() settings := cluster.MakeTestingClusterSettings() - sql.FeatureTLSAutoJoinEnabled.Override(&settings.SV, true) + sql.FeatureTLSAutoJoinEnabled.Override(ctx, &settings.SV, true) s, sqldb, _ := serverutils.StartServer(t, base.TestServerArgs{ Settings: settings, }) diff --git a/pkg/cli/gen.go b/pkg/cli/gen.go index 22055c5c0b93..4e762a30a561 100644 --- a/pkg/cli/gen.go +++ b/pkg/cli/gen.go @@ -11,6 +11,7 @@ package cli import ( + "context" "crypto/rand" "fmt" "io" @@ -205,7 +206,7 @@ Output the list of cluster settings known to this binary. // Fill a Values struct with the defaults. s := cluster.MakeTestingClusterSettings() - settings.NewUpdater(&s.SV).ResetRemaining() + settings.NewUpdater(&s.SV).ResetRemaining(context.Background()) var rows [][]string for _, name := range settings.Keys() { diff --git a/pkg/cli/zip_test.go b/pkg/cli/zip_test.go index f185f5b89c97..dc7e199234e8 100644 --- a/pkg/cli/zip_test.go +++ b/pkg/cli/zip_test.go @@ -402,7 +402,7 @@ func TestPartialZip(t *testing.T) { // is no risk to see the override bumped due to a gossip update // because this setting is not otherwise set in the test cluster. s := tc.Server(0) - kvserver.TimeUntilStoreDead.Override(&s.ClusterSettings().SV, kvserver.TestTimeUntilStoreDead) + kvserver.TimeUntilStoreDead.Override(ctx, &s.ClusterSettings().SV, kvserver.TestTimeUntilStoreDead) // This last case may take a little while to converge. To make this work with datadriven and at the same // time retain the ability to use the `-rewrite` flag, we use a retry loop within that already checks the diff --git a/pkg/clusterversion/clusterversion.go b/pkg/clusterversion/clusterversion.go index d4a14d7499dd..56b614524884 100644 --- a/pkg/clusterversion/clusterversion.go +++ b/pkg/clusterversion/clusterversion.go @@ -201,7 +201,7 @@ func (v *handleImpl) SetActiveVersion(ctx context.Context, cv ClusterVersion) er return err } - version.SetInternal(v.sv, encoded) + version.SetInternal(ctx, v.sv, encoded) return nil } diff --git a/pkg/clusterversion/setting.go b/pkg/clusterversion/setting.go index 3d47498e5f2f..dc7a08197678 100644 --- a/pkg/clusterversion/setting.go +++ b/pkg/clusterversion/setting.go @@ -105,7 +105,7 @@ func (cv *clusterVersionSetting) initialize( if err != nil { return err } - cv.SetInternal(sv, encoded) + cv.SetInternal(ctx, sv, encoded) return nil } diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index e08f7c8e7a42..ff1d4799c7cc 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -202,30 +202,31 @@ func TestJobSchedulerDaemonInitialScanDelay(t *testing.T) { func getScopedSettings() (*settings.Values, func()) { sv := &settings.Values{} - sv.Init(nil) + sv.Init(context.Background(), nil) return sv, settings.TestingSaveRegistry() } func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() sv, cleanup := getScopedSettings() defer cleanup() - schedulerEnabledSetting.Override(sv, false) + schedulerEnabledSetting.Override(ctx, sv, false) // When disabled, we wait 5 minutes before rechecking. require.EqualValues(t, 5*time.Minute, getWaitPeriod(sv, nil)) - schedulerEnabledSetting.Override(sv, true) + schedulerEnabledSetting.Override(ctx, sv, true) // When pace is too low, we use something more reasonable. - schedulerPaceSetting.Override(sv, time.Nanosecond) + schedulerPaceSetting.Override(ctx, sv, time.Nanosecond) require.EqualValues(t, minPacePeriod, getWaitPeriod(sv, nil)) // Otherwise, we use user specified setting. pace := 42 * time.Second - schedulerPaceSetting.Override(sv, pace) + schedulerPaceSetting.Override(ctx, sv, pace) require.EqualValues(t, pace, getWaitPeriod(sv, nil)) } @@ -288,7 +289,7 @@ func TestJobSchedulerCanBeDisabledWhileSleeping(t *testing.T) { knobs := fastDaemonKnobs(func() time.Duration { // Disable daemon - schedulerEnabledSetting.Override(&h.cfg.Settings.SV, false) + schedulerEnabledSetting.Override(ctx, &h.cfg.Settings.SV, false) // Before we return, create a job which should not be executed // (since the daemon is disabled). We use our special executor @@ -422,7 +423,7 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) { // Advance our fake time 1 hour forward (plus a bit) so that the daemon finds matching jobs. h.env.AdvanceTime(time.Hour + time.Second) const jobsPerIteration = 2 - schedulerMaxJobsPerIterationSetting.Override(&h.cfg.Settings.SV, jobsPerIteration) + schedulerMaxJobsPerIterationSetting.Override(ctx, &h.cfg.Settings.SV, jobsPerIteration) // Make daemon execute initial scan immediately, but block subsequent scans. h.cfg.TestingKnobs = fastDaemonKnobs(overridePaceSetting(time.Hour)) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 145bfa51acfb..66566503f687 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -732,7 +732,7 @@ UPDATE system.jobs if err := stopper.RunAsyncTask(context.Background(), "jobs/gc", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) settingChanged := make(chan struct{}, 1) - gcSetting.SetOnChange(&r.settings.SV, func() { + gcSetting.SetOnChange(&r.settings.SV, func(ctx context.Context) { select { case settingChanged <- struct{}{}: default: diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 3cfeb3f61089..4ca99662a0b0 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -96,11 +96,11 @@ func TestRegistryResumeExpiredLease(t *testing.T) { defer s.Stopper().Stop(ctx) // Disable leniency for instant expiration - jobs.LeniencySetting.Override(&s.ClusterSettings().SV, 0) + jobs.LeniencySetting.Override(ctx, &s.ClusterSettings().SV, 0) const cancelInterval = time.Duration(math.MaxInt64) const adoptInterval = time.Microsecond - slinstance.DefaultTTL.Override(&s.ClusterSettings().SV, 2*adoptInterval) - slinstance.DefaultHeartBeat.Override(&s.ClusterSettings().SV, adoptInterval) + slinstance.DefaultTTL.Override(ctx, &s.ClusterSettings().SV, 2*adoptInterval) + slinstance.DefaultHeartBeat.Override(ctx, &s.ClusterSettings().SV, adoptInterval) db := s.DB() clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 7344ed787bdd..eb13c91b0309 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -379,7 +379,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.clusterID = &cfg.RPCContext.ClusterID ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency", uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV))) - senderConcurrencyLimit.SetOnChange(&cfg.Settings.SV, func() { + senderConcurrencyLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV))) }) ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 9b797c71bacb..c19659d09041 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -2428,7 +2428,7 @@ func TestPutsInStagingTxn(t *testing.T) { // DistSender are send serially and the transaction is updated from one to // another. See below. settings := cluster.MakeTestingClusterSettings() - senderConcurrencyLimit.Override(&settings.SV, 0) + senderConcurrencyLimit.Override(ctx, &settings.SV, 0) s, _, db := serverutils.StartServer(t, base.TestServerArgs{ diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go index 5938c35db665..e285a019bdf5 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go @@ -169,7 +169,7 @@ func TestTxnCommitterStripsInFlightWrites(t *testing.T) { defer tc.stopper.Stop(ctx) // Start with parallel commits disabled. Should NOT attach in-flight writes. - parallelCommitsEnabled.Override(&tc.st.SV, false) + parallelCommitsEnabled.Override(ctx, &tc.st.SV, false) txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -211,7 +211,7 @@ func TestTxnCommitterStripsInFlightWrites(t *testing.T) { require.NotNil(t, br) // Enable parallel commits and send the same batch. Should attach in-flight writes. - parallelCommitsEnabled.Override(&tc.st.SV, true) + parallelCommitsEnabled.Override(ctx, &tc.st.SV, true) ba.Requests = nil etArgsCopy = etArgs diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index f0fbec9255bf..08432c3ca8f9 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -567,9 +567,9 @@ func TestTxnPipelinerManyWrites(t *testing.T) { // Disable write_pipelining_max_outstanding_size, // write_pipelining_max_batch_size, and max_intents_bytes limits. - pipelinedWritesMaxInFlightSize.Override(&tp.st.SV, math.MaxInt64) - pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0) - trackedWritesMaxSize.Override(&tp.st.SV, math.MaxInt64) + pipelinedWritesMaxInFlightSize.Override(ctx, &tp.st.SV, math.MaxInt64) + pipelinedWritesMaxBatchSize.Override(ctx, &tp.st.SV, 0) + trackedWritesMaxSize.Override(ctx, &tp.st.SV, math.MaxInt64) const writes = 2048 keyBuf := roachpb.Key(strings.Repeat("a", writes+1)) @@ -835,7 +835,7 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { tp, mockSender := makeMockTxnPipeliner() // Start with pipelining disabled. Should NOT use async consensus. - pipelinedWritesEnabled.Override(&tp.st.SV, false) + pipelinedWritesEnabled.Override(ctx, &tp.st.SV, false) txn := makeTxnProto() keyA, keyC := roachpb.Key("a"), roachpb.Key("c") @@ -862,7 +862,7 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) // Enable pipelining. Should use async consensus. - pipelinedWritesEnabled.Override(&tp.st.SV, true) + pipelinedWritesEnabled.Override(ctx, &tp.st.SV, true) ba.Requests = nil putArgs2 := roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}} @@ -890,7 +890,7 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { // Disable pipelining again. Should NOT use async consensus but should still // make sure to chain on to any overlapping in-flight writes. - pipelinedWritesEnabled.Override(&tp.st.SV, false) + pipelinedWritesEnabled.Override(ctx, &tp.st.SV, false) ba.Requests = nil putArgs4 := roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}} @@ -963,7 +963,7 @@ func TestTxnPipelinerMaxInFlightSize(t *testing.T) { tp, mockSender := makeMockTxnPipeliner() // Set maxInFlightSize limit to 3 bytes. - pipelinedWritesMaxInFlightSize.Override(&tp.st.SV, 3) + pipelinedWritesMaxInFlightSize.Override(ctx, &tp.st.SV, 3) txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -1102,7 +1102,7 @@ func TestTxnPipelinerMaxInFlightSize(t *testing.T) { require.Equal(t, int64(0), tp.ifWrites.byteSize()) // Increase maxInFlightSize limit to 5 bytes. - pipelinedWritesMaxInFlightSize.Override(&tp.st.SV, 5) + pipelinedWritesMaxInFlightSize.Override(ctx, &tp.st.SV, 5) // The original batch with 4 writes should succeed. ba.Requests = nil @@ -1140,7 +1140,7 @@ func TestTxnPipelinerMaxBatchSize(t *testing.T) { tp, mockSender := makeMockTxnPipeliner() // Set maxBatchSize limit to 1. - pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 1) + pipelinedWritesMaxBatchSize.Override(ctx, &tp.st.SV, 1) txn := makeTxnProto() keyA, keyC := roachpb.Key("a"), roachpb.Key("c") @@ -1189,7 +1189,7 @@ func TestTxnPipelinerMaxBatchSize(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) // Increase maxBatchSize limit to 2. - pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 2) + pipelinedWritesMaxBatchSize.Override(ctx, &tp.st.SV, 2) // Same batch now below limit. mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { @@ -1400,6 +1400,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) { func TestTxnPipelinerCondenseLockSpans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() a := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key(nil)} b := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key(nil)} c := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key(nil)} @@ -1445,7 +1446,7 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { descDB := mockRangeDescriptorDBForDescs(descs...) s := createTestDB(t) st := s.Store.ClusterSettings() - trackedWritesMaxSize.Override(&st.SV, 10) /* 10 bytes and it will condense */ + trackedWritesMaxSize.Override(ctx, &st.SV, 10) /* 10 bytes and it will condense */ defer s.Stop() // Check end transaction locks, which should be condensed and split @@ -1488,7 +1489,6 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { ds, ) db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper()) - ctx := context.Background() txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) // Disable txn pipelining so that all write spans are immediately diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index 905ac3c73523..3c1858d4da62 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -859,7 +859,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { keyD, keyE := roachpb.Key("d"), roachpb.Key("e") // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) + MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3) // Send a batch below the limit. var ba roachpb.BatchRequest @@ -1084,7 +1084,7 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { keyC, keyD := roachpb.Key("c"), roachpb.Key("d") // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) + MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3) // Send a batch below the limit. var ba roachpb.BatchRequest diff --git a/pkg/kv/kvprober/kvprober_integration_test.go b/pkg/kv/kvprober/kvprober_integration_test.go index 433da035ba41..3f52c66899d2 100644 --- a/pkg/kv/kvprober/kvprober_integration_test.go +++ b/pkg/kv/kvprober/kvprober_integration_test.go @@ -47,7 +47,7 @@ func TestProberDoesReads(t *testing.T) { s, _, p, cleanup := initTestProber(t, base.TestingKnobs{}) defer cleanup() - kvprober.ReadInterval.Override(&s.ClusterSettings().SV, 5*time.Millisecond) + kvprober.ReadInterval.Override(ctx, &s.ClusterSettings().SV, 5*time.Millisecond) require.NoError(t, p.Start(ctx, s.Stopper())) @@ -61,8 +61,8 @@ func TestProberDoesReads(t *testing.T) { s, _, p, cleanup := initTestProber(t, base.TestingKnobs{}) defer cleanup() - kvprober.ReadEnabled.Override(&s.ClusterSettings().SV, true) - kvprober.ReadInterval.Override(&s.ClusterSettings().SV, 5*time.Millisecond) + kvprober.ReadEnabled.Override(ctx, &s.ClusterSettings().SV, true) + kvprober.ReadInterval.Override(ctx, &s.ClusterSettings().SV, 5*time.Millisecond) require.NoError(t, p.Start(ctx, s.Stopper())) @@ -92,8 +92,8 @@ func TestProberDoesReads(t *testing.T) { }) defer cleanup() - kvprober.ReadEnabled.Override(&s.ClusterSettings().SV, true) - kvprober.ReadInterval.Override(&s.ClusterSettings().SV, 5*time.Millisecond) + kvprober.ReadEnabled.Override(ctx, &s.ClusterSettings().SV, true) + kvprober.ReadInterval.Override(ctx, &s.ClusterSettings().SV, 5*time.Millisecond) require.NoError(t, p.Start(ctx, s.Stopper())) @@ -139,8 +139,8 @@ func TestProberDoesReads(t *testing.T) { dbIsAvailable = false mu.Unlock() - kvprober.ReadEnabled.Override(&s.ClusterSettings().SV, true) - kvprober.ReadInterval.Override(&s.ClusterSettings().SV, 5*time.Millisecond) + kvprober.ReadEnabled.Override(ctx, &s.ClusterSettings().SV, true) + kvprober.ReadInterval.Override(ctx, &s.ClusterSettings().SV, 5*time.Millisecond) // Probe exactly ten times so we can make assertions below. for i := 0; i < 10; i++ { @@ -220,7 +220,7 @@ func initTestProber( }) // Given small test cluster, this better exercises the planning logic. - kvprober.NumStepsToPlanAtOnce.Override(&s.ClusterSettings().SV, 10) + kvprober.NumStepsToPlanAtOnce.Override(context.Background(), &s.ClusterSettings().SV, 10) // Want these tests to run as fast as possible; see planner_test.go for a // unit test of the rate limiting. p.SetPlanningRateLimit(0) diff --git a/pkg/kv/kvprober/kvprober_test.go b/pkg/kv/kvprober/kvprober_test.go index fbcfc8ed9766..e9d645642f6f 100644 --- a/pkg/kv/kvprober/kvprober_test.go +++ b/pkg/kv/kvprober/kvprober_test.go @@ -49,7 +49,7 @@ func TestProbe(t *testing.T) { t.Run("happy path", func(t *testing.T) { m := &mock{t: t} p := initTestProber(m) - readEnabled.Override(&p.settings.SV, true) + readEnabled.Override(ctx, &p.settings.SV, true) p.probe(ctx, m) @@ -66,7 +66,7 @@ func TestProbe(t *testing.T) { noGet: true, } p := initTestProber(m) - readEnabled.Override(&p.settings.SV, true) + readEnabled.Override(ctx, &p.settings.SV, true) p.probe(ctx, m) @@ -82,7 +82,7 @@ func TestProbe(t *testing.T) { getErr: fmt.Errorf("inject get failure"), } p := initTestProber(m) - readEnabled.Override(&p.settings.SV, true) + readEnabled.Override(ctx, &p.settings.SV, true) p.probe(ctx, m) diff --git a/pkg/kv/kvprober/planner_test.go b/pkg/kv/kvprober/planner_test.go index 40384efa3804..23b0d36927fa 100644 --- a/pkg/kv/kvprober/planner_test.go +++ b/pkg/kv/kvprober/planner_test.go @@ -115,10 +115,11 @@ func TestPlannerEnforcesRateLimit(t *testing.T) { } func TestGetRateLimit(t *testing.T) { + ctx := context.Background() s := cluster.MakeTestingClusterSettings() - readInterval.Override(&s.SV, time.Second) - numStepsToPlanAtOnce.Override(&s.SV, 60) + readInterval.Override(ctx, &s.SV, time.Second) + numStepsToPlanAtOnce.Override(ctx, &s.SV, 60) got := getRateLimitImpl(s) require.Equal(t, 30*time.Second, got) diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index ca13280ae542..dbddec49aab6 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -4472,8 +4472,9 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() st := cluster.MakeTestingClusterSettings() - enableLoadBasedLeaseRebalancing.Override(&st.SV, true) + enableLoadBasedLeaseRebalancing.Override(ctx, &st.SV, true) remoteStore := roachpb.StoreDescriptor{ Node: roachpb.NodeDescriptor{ @@ -6602,7 +6603,7 @@ func TestAllocatorFullDisks(t *testing.T) { server := rpc.NewServer(rpcContext) // never started g := gossip.NewTest(1, rpcContext, server, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) - TimeUntilStoreDead.Override(&st.SV, TestTimeUntilStoreDeadOff) + TimeUntilStoreDead.Override(ctx, &st.SV, TestTimeUntilStoreDeadOff) const generations = 100 const nodes = 20 @@ -6722,6 +6723,7 @@ func Example_rebalancing() { stopper := stop.NewStopper() defer stopper.Stop(context.Background()) + ctx := context.Background() st := cluster.MakeTestingClusterSettings() clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) @@ -6738,7 +6740,7 @@ func Example_rebalancing() { server := rpc.NewServer(rpcContext) // never started g := gossip.NewTest(1, rpcContext, server, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) - TimeUntilStoreDead.Override(&st.SV, TestTimeUntilStoreDeadOff) + TimeUntilStoreDead.Override(ctx, &st.SV, TestTimeUntilStoreDeadOff) const generations = 100 const nodes = 20 @@ -6807,9 +6809,9 @@ func Example_rebalancing() { for j := 0; j < len(testStores); j++ { ts := &testStores[j] var rangeUsageInfo RangeUsageInfo - target, _, details, ok := alloc.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) + target, _, details, ok := alloc.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) if ok { - log.Infof(context.Background(), "rebalancing to %v; details: %s", target, details) + log.Infof(ctx, "rebalancing to %v; details: %s", target, details) testStores[j].rebalance(&testStores[int(target.StoreID)], alloc.randGen.Int63n(1<<20)) } } diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index fcbcd7eb8bda..79eab5465f8f 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -485,7 +485,7 @@ func TestLeasePreferencesRebalance(t *testing.T) { settings := cluster.MakeTestingClusterSettings() sv := &settings.SV // set min lease transfer high, so we know it does affect the lease movement. - kvserver.MinLeaseTransferInterval.Override(sv, 24*time.Hour) + kvserver.MinLeaseTransferInterval.Override(ctx, sv, 24*time.Hour) // Place all the leases in us-west. zcfg := zonepb.DefaultZoneConfig() zcfg.LeasePreferences = []zonepb.LeasePreference{ diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 576df4fac7b4..e81cf019b4c9 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4091,8 +4091,8 @@ func TestMergeQueue(t *testing.T) { zoneConfig.RangeMinBytes = &rangeMinBytes settings := cluster.MakeTestingClusterSettings() sv := &settings.SV - kvserverbase.MergeQueueEnabled.Override(sv, true) - kvserver.MergeQueueInterval.Override(sv, 0) // process greedily + kvserverbase.MergeQueueEnabled.Override(ctx, sv, true) + kvserver.MergeQueueInterval.Override(ctx, sv, 0) // process greedily tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ @@ -4114,7 +4114,7 @@ func TestMergeQueue(t *testing.T) { store := tc.GetFirstStoreFromServer(t, 0) // The cluster with manual replication disables the merge queue, // so we need to re-enable. - kvserverbase.MergeQueueEnabled.Override(sv, true) + kvserverbase.MergeQueueEnabled.Override(ctx, sv, true) store.SetMergeQueueActive(true) split := func(t *testing.T, key roachpb.Key, expirationTime hlc.Timestamp) { @@ -4174,7 +4174,7 @@ func TestMergeQueue(t *testing.T) { // Disable load-based splitting, so that the absence of sufficient QPS // measurements do not prevent ranges from merging. Certain subtests // re-enable the functionality. - kvserver.SplitByLoadEnabled.Override(sv, false) + kvserver.SplitByLoadEnabled.Override(ctx, sv, false) store.MustForceMergeScanAndProcess() // drain any merges that might already be queued split(t, rhsStartKey.AsRawKey(), hlc.Timestamp{} /* expirationTime */) } @@ -4270,13 +4270,13 @@ func TestMergeQueue(t *testing.T) { // measurement from both sides to be sufficiently stable and reliable, // meaning that it was a maximum measurement over some extended period of // time. - kvserver.SplitByLoadEnabled.Override(sv, true) - kvserver.SplitByLoadQPSThreshold.Override(sv, splitByLoadQPS) + kvserver.SplitByLoadEnabled.Override(ctx, sv, true) + kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadQPS) // Drop the load-based splitting merge delay setting, which also dictates // the duration that a leaseholder must measure QPS before considering its // measurements to be reliable enough to base range merging decisions on. - kvserverbase.SplitByLoadMergeDelay.Override(sv, splitByLoadMergeDelay) + kvserverbase.SplitByLoadMergeDelay.Override(ctx, sv, splitByLoadMergeDelay) // Reset both range's load-based splitters, so that QPS measurements do // not leak over between subtests. Then, bump the manual clock so that diff --git a/pkg/kv/kvserver/closedts/container/container_test.go b/pkg/kv/kvserver/closedts/container/container_test.go index 05b20b6f8837..0b120ea9225a 100644 --- a/pkg/kv/kvserver/closedts/container/container_test.go +++ b/pkg/kv/kvserver/closedts/container/container_test.go @@ -78,8 +78,8 @@ func prepareContainer() *TestContainer { // Set the target duration to a second and the close fraction so small // that the Provider will essentially close in a hot loop. In this test // we'll block in the clock to pace the Provider's closer loop. - closedts.TargetDuration.Override(&st.SV, time.Second) - closedts.CloseFraction.Override(&st.SV, 1e-9) + closedts.TargetDuration.Override(context.Background(), &st.SV, time.Second) + closedts.CloseFraction.Override(context.Background(), &st.SV, 1e-9) // We perform a little dance with the Dialer. It needs to be hooked up to the // Server, but that's only created in NewContainer. The Dialer isn't used until diff --git a/pkg/kv/kvserver/closedts/provider/provider.go b/pkg/kv/kvserver/closedts/provider/provider.go index 8ff26b11f630..00bdfa671d8a 100644 --- a/pkg/kv/kvserver/closedts/provider/provider.go +++ b/pkg/kv/kvserver/closedts/provider/provider.go @@ -117,7 +117,7 @@ func (p *Provider) runCloser(ctx context.Context) { defer close(ch) confCh := make(chan struct{}, 1) - confChanged := func() { + confChanged := func(ctx context.Context) { select { case confCh <- struct{}{}: default: diff --git a/pkg/kv/kvserver/closedts/provider/provider_test.go b/pkg/kv/kvserver/closedts/provider/provider_test.go index 097415178779..fde9caa3cdf7 100644 --- a/pkg/kv/kvserver/closedts/provider/provider_test.go +++ b/pkg/kv/kvserver/closedts/provider/provider_test.go @@ -48,8 +48,8 @@ func TestProviderSubscribeNotify(t *testing.T) { // We'll only unleash the closer loop when the test is basically done, and // once we do that we want it to run aggressively. // Testing that the closer loop works as advertised is left to another test. - closedts.TargetDuration.Override(&st.SV, time.Millisecond) - closedts.CloseFraction.Override(&st.SV, 1.0) + closedts.TargetDuration.Override(ctx, &st.SV, time.Millisecond) + closedts.CloseFraction.Override(ctx, &st.SV, 1.0) storage := &providertestutils.TestStorage{} unblockClockCh := make(chan struct{}) @@ -240,10 +240,11 @@ func TestProviderSubscribeNotify(t *testing.T) { // handled concurrent subscriptions. func TestProviderSubscribeConcurrent(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() st := cluster.MakeTestingClusterSettings() - closedts.TargetDuration.Override(&st.SV, time.Millisecond) - closedts.CloseFraction.Override(&st.SV, 1.0) + closedts.TargetDuration.Override(ctx, &st.SV, time.Millisecond) + closedts.CloseFraction.Override(ctx, &st.SV, 1.0) stopper := stop.NewStopper() storage := &providertestutils.TestStorage{} @@ -296,10 +297,11 @@ func TestProviderSubscribeConcurrent(t *testing.T) { func TestProviderTargetDurationSetting(t *testing.T) { defer leaktest.AfterTest(t)() skip.WithIssue(t, closedts.IssueTrackingRemovalOfOldClosedTimestampsCode) + ctx := context.Background() st := cluster.MakeTestingClusterSettings() - closedts.TargetDuration.Override(&st.SV, time.Millisecond) - closedts.CloseFraction.Override(&st.SV, 1.0) + closedts.TargetDuration.Override(ctx, &st.SV, time.Millisecond) + closedts.CloseFraction.Override(ctx, &st.SV, 1.0) stopper := stop.NewStopper() storage := &providertestutils.TestStorage{} @@ -318,7 +320,7 @@ func TestProviderTargetDurationSetting(t *testing.T) { }, Close: func(next hlc.Timestamp, expCurEpoch ctpb.Epoch) (hlc.Timestamp, map[roachpb.RangeID]ctpb.LAI, bool) { if called++; called == 1 { - closedts.TargetDuration.Override(&st.SV, 0) + closedts.TargetDuration.Override(ctx, &st.SV, 0) } select { case calledCh <- struct{}{}: @@ -345,6 +347,6 @@ func TestProviderTargetDurationSetting(t *testing.T) { t.Fatal("expected no updates to be sent") case <-time.After(someTime): } - closedts.TargetDuration.Override(&st.SV, time.Millisecond) + closedts.TargetDuration.Override(ctx, &st.SV, time.Millisecond) <-calledCh } diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender.go b/pkg/kv/kvserver/closedts/sidetransport/sender.go index e7237dc20fb4..85b14296f0e3 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender.go @@ -224,7 +224,7 @@ func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) { waitForUpgrade := !s.st.Version.IsActive(ctx, clusterversion.ClosedTimestampsRaftTransport) confCh := make(chan struct{}, 1) - confChanged := func() { + confChanged := func(ctx context.Context) { select { case confCh <- struct{}{}: default: diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 9bcb1f21bddf..4d2a6ef5bf8c 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -834,17 +834,17 @@ func (c *cluster) detectDeadlocks() { } func (c *cluster) enableTxnPushes() { - concurrency.LockTableLivenessPushDelay.Override(&c.st.SV, 0*time.Millisecond) - concurrency.LockTableDeadlockDetectionPushDelay.Override(&c.st.SV, 0*time.Millisecond) + concurrency.LockTableLivenessPushDelay.Override(context.Background(), &c.st.SV, 0*time.Millisecond) + concurrency.LockTableDeadlockDetectionPushDelay.Override(context.Background(), &c.st.SV, 0*time.Millisecond) } func (c *cluster) disableTxnPushes() { - concurrency.LockTableLivenessPushDelay.Override(&c.st.SV, time.Hour) - concurrency.LockTableDeadlockDetectionPushDelay.Override(&c.st.SV, time.Hour) + concurrency.LockTableLivenessPushDelay.Override(context.Background(), &c.st.SV, time.Hour) + concurrency.LockTableDeadlockDetectionPushDelay.Override(context.Background(), &c.st.SV, time.Hour) } func (c *cluster) setDiscoveredLocksThresholdToConsultFinalizedTxnCache(n int) { - concurrency.DiscoveredLocksThresholdToConsultFinalizedTxnCache.Override(&c.st.SV, int64(n)) + concurrency.DiscoveredLocksThresholdToConsultFinalizedTxnCache.Override(context.Background(), &c.st.SV, int64(n)) } // reset clears all request state in the cluster. This avoids portions of tests diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 676a63a12885..d76cb04c3a46 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -101,8 +101,8 @@ var lockTableWaiterTestClock = hlc.Timestamp{WallTime: 12} func setupLockTableWaiterTest() (*lockTableWaiterImpl, *mockIntentResolver, *mockLockTableGuard) { ir := &mockIntentResolver{} st := cluster.MakeTestingClusterSettings() - LockTableLivenessPushDelay.Override(&st.SV, 0) - LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0) + LockTableLivenessPushDelay.Override(context.Background(), &st.SV, 0) + LockTableDeadlockDetectionPushDelay.Override(context.Background(), &st.SV, 0) manual := hlc.NewManualClock(lockTableWaiterTestClock.WallTime) guard := &mockLockTableGuard{ signal: make(chan struct{}, 1), diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 29e93256a6b2..bcb9079be99e 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -219,7 +219,7 @@ func (s *Store) RequestClosedTimestamp(nodeID roachpb.NodeID, rangeID roachpb.Ra } func NewTestStorePool(cfg StoreConfig) *StorePool { - TimeUntilStoreDead.Override(&cfg.Settings.SV, TestTimeUntilStoreDeadOff) + TimeUntilStoreDead.Override(context.Background(), &cfg.Settings.SV, TestTimeUntilStoreDeadOff) return NewStorePool( cfg.AmbientCtx, cfg.Settings, diff --git a/pkg/kv/kvserver/liveness/client_test.go b/pkg/kv/kvserver/liveness/client_test.go index 06404e5283e7..80019481dc8d 100644 --- a/pkg/kv/kvserver/liveness/client_test.go +++ b/pkg/kv/kvserver/liveness/client_test.go @@ -251,8 +251,7 @@ func TestNodeLivenessStatusMap(t *testing.T) { // doesn't allow durations below 1m15s, which is much too long // for a test. // We do this in every SucceedsSoon attempt, so we'll be good. - kvserver.TimeUntilStoreDead.Override(&firstServer.ClusterSettings().SV, - kvserver.TestTimeUntilStoreDead) + kvserver.TimeUntilStoreDead.Override(ctx, &firstServer.ClusterSettings().SV, kvserver.TestTimeUntilStoreDead) log.Infof(ctx, "checking expected status (%s) for node %d", expectedStatus, nodeID) resp, err := admin.Liveness(ctx, &serverpb.LivenessRequest{}) diff --git a/pkg/kv/kvserver/merge_queue_test.go b/pkg/kv/kvserver/merge_queue_test.go index f294f011d91a..621f64928af1 100644 --- a/pkg/kv/kvserver/merge_queue_test.go +++ b/pkg/kv/kvserver/merge_queue_test.go @@ -38,7 +38,7 @@ func TestMergeQueueShouldQueue(t *testing.T) { testCtx.Start(t, stopper) mq := newMergeQueue(testCtx.store, testCtx.store.DB(), testCtx.gossip) - kvserverbase.MergeQueueEnabled.Override(&testCtx.store.ClusterSettings().SV, true) + kvserverbase.MergeQueueEnabled.Override(ctx, &testCtx.store.ClusterSettings().SV, true) tableKey := func(i uint32) []byte { return keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + i) diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache.go b/pkg/kv/kvserver/protectedts/ptcache/cache.go index b4102872520f..d3f517c0a9e4 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache.go @@ -143,7 +143,7 @@ func (c *Cache) Start(ctx context.Context, stopper *stop.Stopper) error { func (c *Cache) periodicallyRefreshProtectedtsCache(ctx context.Context) { settingChanged := make(chan struct{}, 1) - protectedts.PollInterval.SetOnChange(&c.settings.SV, func() { + protectedts.PollInterval.SetOnChange(&c.settings.SV, func(ctx context.Context) { select { case settingChanged <- struct{}{}: default: diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 38d533282482..ab7e7f48f16d 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -51,7 +51,7 @@ func TestCacheBasic(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) // Set the poll interval to be very short. - protectedts.PollInterval.Override(&s.ClusterSettings().SV, 500*time.Microsecond) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Microsecond) c := ptcache.New(ptcache.Config{ Settings: s.ClusterSettings(), @@ -120,7 +120,7 @@ func TestRefresh(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) // Set the poll interval to be very long. - protectedts.PollInterval.Override(&s.ClusterSettings().SV, 500*time.Hour) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ Settings: s.ClusterSettings(), @@ -227,7 +227,7 @@ func TestStart(t *testing.T) { p := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor)) // Set the poll interval to be very long. - protectedts.PollInterval.Override(&s.ClusterSettings().SV, 500*time.Hour) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ Settings: s.ClusterSettings(), DB: s.DB(), @@ -260,7 +260,7 @@ func TestQueryRecord(t *testing.T) { p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) // Set the poll interval to be very long. - protectedts.PollInterval.Override(&s.ClusterSettings().SV, 500*time.Hour) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ Settings: s.ClusterSettings(), DB: s.DB(), @@ -318,7 +318,7 @@ func TestIterate(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) // Set the poll interval to be very long. - protectedts.PollInterval.Override(&s.ClusterSettings().SV, 500*time.Hour) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ Settings: s.ClusterSettings(), @@ -383,7 +383,7 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) // Set the poll interval to be very long. - protectedts.PollInterval.Override(&s.ClusterSettings().SV, 500*time.Hour) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ Settings: s.ClusterSettings(), @@ -400,7 +400,7 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { _, asOf := c.QueryRecord(ctx, uuid.UUID{}) require.Equal(t, asOf, ts) // Set the polling interval back to something very short. - protectedts.PollInterval.Override(&s.ClusterSettings().SV, 100*time.Microsecond) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 100*time.Microsecond) // Ensure that the state is updated again soon. waitForAsOfAfter(t, c, ts) } diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go index 49a8fcb2dfec..9ecccc9608f7 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go @@ -103,7 +103,7 @@ func (r *Reconciler) Start(ctx context.Context, stopper *stop.Stopper) error { func (r *Reconciler) run(ctx context.Context, stopper *stop.Stopper) { reconcileIntervalChanged := make(chan struct{}, 1) - ReconcileInterval.SetOnChange(&r.settings.SV, func() { + ReconcileInterval.SetOnChange(&r.settings.SV, func(ctx context.Context) { select { case reconcileIntervalChanged <- struct{}{}: default: diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go index 2323357c88c5..01dc39a6ebe3 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go @@ -88,7 +88,7 @@ func TestReconciler(t *testing.T) { })) t.Run("update settings", func(t *testing.T) { - ptreconcile.ReconcileInterval.Override(&settings.SV, time.Millisecond) + ptreconcile.ReconcileInterval.Override(ctx, &settings.SV, time.Millisecond) testutils.SucceedsSoon(t, func() error { require.Equal(t, int64(0), r.Metrics().RecordsRemoved.Count()) require.Equal(t, int64(0), r.Metrics().ReconciliationErrors.Count()) @@ -103,7 +103,7 @@ func TestReconciler(t *testing.T) { state.toRemove[recMeta] = struct{}{} state.mu.Unlock() - ptreconcile.ReconcileInterval.Override(&settings.SV, time.Millisecond) + ptreconcile.ReconcileInterval.Override(ctx, &settings.SV, time.Millisecond) testutils.SucceedsSoon(t, func() error { require.Equal(t, int64(0), r.Metrics().ReconciliationErrors.Count()) if removed := r.Metrics().RecordsRemoved.Count(); removed != 1 { diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index 29b73007096b..9c1685453a16 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -978,6 +978,7 @@ func (r mvccStatsReplicaInQueue) GetMVCCStats() enginepb.MVCCStats { func TestQueueRateLimitedTimeoutFunc(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() type testCase struct { guaranteedProcessingTime time.Duration rateLimit int64 // bytes/s @@ -987,8 +988,8 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) { makeTest := func(tc testCase) (string, func(t *testing.T)) { return fmt.Sprintf("%+v", tc), func(t *testing.T) { st := cluster.MakeTestingClusterSettings() - queueGuaranteedProcessingTimeBudget.Override(&st.SV, tc.guaranteedProcessingTime) - recoverySnapshotRate.Override(&st.SV, tc.rateLimit) + queueGuaranteedProcessingTimeBudget.Override(ctx, &st.SV, tc.guaranteedProcessingTime) + recoverySnapshotRate.Override(ctx, &st.SV, tc.rateLimit) tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate) repl := mvccStatsReplicaInQueue{ size: tc.replicaSize, diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index cab69b202db3..4dd5cf5961fa 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -697,8 +697,8 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { mc := hlc.NewManualClock((1613588135 * time.Second).Nanoseconds()) clock := hlc.NewClock(mc.UnixNano, maxOffset) st := cluster.MakeTestingClusterSettings() - closedts.TargetDuration.Override(&st.SV, time.Second) - closedts.SideTransportCloseInterval.Override(&st.SV, 200*time.Millisecond) + closedts.TargetDuration.Override(ctx, &st.SV, time.Second) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 200*time.Millisecond) now := clock.NowAsClockTimestamp() nowTS := now.ToTimestamp() nowMinusClosedLag := nowTS.Add(-closedts.TargetDuration.Get(&st.SV).Nanoseconds(), 0) diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 8921a5add938..f0eb9979d337 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -98,8 +98,8 @@ func TestReplicaRangefeed(t *testing.T) { // Disable closed timestamps as this test was designed assuming no closed // timestamps would get propagated. settings := cluster.MakeTestingClusterSettings() - closedts.TargetDuration.Override(&settings.SV, 24*time.Hour) - kvserver.RangefeedEnabled.Override(&settings.SV, true) + closedts.TargetDuration.Override(ctx, &settings.SV, 24*time.Hour) + kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true) args.ServerArgsPerNode[i] = base.TestServerArgs{Settings: settings} } tc := testcluster.StartTestCluster(t, numNodes, args) @@ -364,7 +364,7 @@ func TestReplicaRangefeedExpiringLeaseError(t *testing.T) { // immediately even if it didn't return the correct error. stream.Cancel() - kvserver.RangefeedEnabled.Override(&store.ClusterSettings().SV, true) + kvserver.RangefeedEnabled.Override(ctx, &store.ClusterSettings().SV, true) pErr := store.RangeFeed(&req, stream) const exp = "expiration-based leases are incompatible with rangefeeds" if !testutils.IsPError(pErr, exp) { @@ -749,7 +749,7 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { }, Span: rangefeedSpan, } - kvserver.RangefeedEnabled.Override(&store.ClusterSettings().SV, true) + kvserver.RangefeedEnabled.Override(ctx, &store.ClusterSettings().SV, true) pErr := store.RangeFeed(&req, stream) streamErrC <- pErr }() @@ -759,7 +759,7 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { // Disable rangefeeds, which stops logical op logs from being provided // with Raft commands. - kvserver.RangefeedEnabled.Override(&store.ClusterSettings().SV, false) + kvserver.RangefeedEnabled.Override(ctx, &store.ClusterSettings().SV, false) // Perform a write on the range. writeKey := encoding.EncodeStringAscending(keys.SystemSQLCodec.TablePrefix(55), "c") diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 48d16f6b15c1..4859f3aee3e4 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -9251,6 +9251,7 @@ func TestCommandTooLarge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.Background()) @@ -9258,7 +9259,7 @@ func TestCommandTooLarge(t *testing.T) { st := tc.store.cfg.Settings st.Manual.Store(true) - MaxCommandSize.Override(&st.SV, 1024) + MaxCommandSize.Override(ctx, &st.SV, 1024) args := putArgs(roachpb.Key("k"), []byte(strings.Repeat("a", int(MaxCommandSize.Get(&st.SV))))) @@ -10693,7 +10694,7 @@ func TestReplicaNotifyLockTableOn1PC(t *testing.T) { // Disable txn liveness pushes. See below for why. st := tc.store.cfg.Settings st.Manual.Store(true) - concurrency.LockTableLivenessPushDelay.Override(&st.SV, 24*time.Hour) + concurrency.LockTableLivenessPushDelay.Override(ctx, &st.SV, 24*time.Hour) // Write a value to a key A. key := roachpb.Key("a") @@ -12570,7 +12571,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { cfg.RaftMaxCommittedSizePerReady = 1 // Set up tracing. tracer := tracing.NewTracer() - tracer.Configure(&cfg.Settings.SV) + tracer.Configure(ctx, &cfg.Settings.SV) cfg.AmbientCtx.Tracer = tracer // Below we set txnID to the value of the transaction we're going to force to @@ -12709,7 +12710,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { cfg := TestStoreConfig(hlc.NewClock(hlc.UnixNano, time.Nanosecond)) // Set up tracing. tracer := tracing.NewTracer() - tracer.Configure(&cfg.Settings.SV) + tracer.Configure(ctx, &cfg.Settings.SV) tracer.AlwaysTrace() cfg.AmbientCtx.Tracer = tracer tc.StartWithStoreConfig(t, stopper, cfg) diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 4d5371cb2869..183a382c0413 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -56,6 +56,7 @@ func TestReplicateQueueRebalance(t *testing.T) { const numNodes = 5 + ctx := context.Background() tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ ReplicationMode: base.ReplicationAuto, @@ -70,7 +71,7 @@ func TestReplicateQueueRebalance(t *testing.T) { for _, server := range tc.Servers { st := server.ClusterSettings() st.Manual.Store(true) - kvserver.LoadBasedRebalancingMode.Override(&st.SV, int64(kvserver.LBRebalancingOff)) + kvserver.LoadBasedRebalancingMode.Override(ctx, &st.SV, int64(kvserver.LBRebalancingOff)) } const newRanges = 10 @@ -131,7 +132,7 @@ func TestReplicateQueueRebalance(t *testing.T) { if c < minReplicas { err := errors.Errorf( "not balanced (want at least %d replicas on all stores): %d", minReplicas, counts) - log.Infof(context.Background(), "%v", err) + log.Infof(ctx, "%v", err) return err } } diff --git a/pkg/kv/kvserver/reports/constraint_stats_report_test.go b/pkg/kv/kvserver/reports/constraint_stats_report_test.go index 0d2eeaedfe23..4f143bd3f620 100644 --- a/pkg/kv/kvserver/reports/constraint_stats_report_test.go +++ b/pkg/kv/kvserver/reports/constraint_stats_report_test.go @@ -567,7 +567,7 @@ func TestConstraintReport(t *testing.T) { // This test uses the cluster as a recipient for a report saved from outside // the cluster. We disable the cluster's own production of reports so that it // doesn't interfere with the test. - ReporterInterval.Override(&st.SV, 0) + ReporterInterval.Override(ctx, &st.SV, 0) s, _, db := serverutils.StartServer(t, base.TestServerArgs{Settings: st}) con := s.InternalExecutor().(sqlutil.InternalExecutor) defer s.Stopper().Stop(ctx) diff --git a/pkg/kv/kvserver/reports/critical_localities_report_test.go b/pkg/kv/kvserver/reports/critical_localities_report_test.go index 36cdae7fe6c0..0625239ce649 100644 --- a/pkg/kv/kvserver/reports/critical_localities_report_test.go +++ b/pkg/kv/kvserver/reports/critical_localities_report_test.go @@ -179,7 +179,7 @@ func TestCriticalLocalitiesSaving(t *testing.T) { // This test uses the cluster as a recipient for a report saved from outside // the cluster. We disable the cluster's own production of reports so that it // doesn't interfere with the test. - ReporterInterval.Override(&st.SV, 0) + ReporterInterval.Override(ctx, &st.SV, 0) s, _, db := serverutils.StartServer(t, base.TestServerArgs{Settings: st}) con := s.InternalExecutor().(sqlutil.InternalExecutor) defer s.Stopper().Stop(ctx) diff --git a/pkg/kv/kvserver/reports/replication_stats_report_test.go b/pkg/kv/kvserver/reports/replication_stats_report_test.go index 35b548cc38d5..0c1200a12a5f 100644 --- a/pkg/kv/kvserver/reports/replication_stats_report_test.go +++ b/pkg/kv/kvserver/reports/replication_stats_report_test.go @@ -36,7 +36,7 @@ func TestRangeReport(t *testing.T) { // This test uses the cluster as a recipient for a report saved from outside // the cluster. We disable the cluster's own production of reports so that it // doesn't interfere with the test. - ReporterInterval.Override(&st.SV, 0) + ReporterInterval.Override(ctx, &st.SV, 0) s, _, db := serverutils.StartServer(t, base.TestServerArgs{Settings: st}) con := s.InternalExecutor().(sqlutil.InternalExecutor) defer s.Stopper().Stop(ctx) diff --git a/pkg/kv/kvserver/reports/reporter.go b/pkg/kv/kvserver/reports/reporter.go index 7b35639e4fb7..8a29f1a557f1 100644 --- a/pkg/kv/kvserver/reports/reporter.go +++ b/pkg/kv/kvserver/reports/reporter.go @@ -105,7 +105,7 @@ func (stats *Reporter) reportInterval() (time.Duration, <-chan struct{}) { // Start the periodic calls to Update(). func (stats *Reporter) Start(ctx context.Context, stopper *stop.Stopper) { - ReporterInterval.SetOnChange(&stats.settings.SV, func() { + ReporterInterval.SetOnChange(&stats.settings.SV, func(ctx context.Context) { stats.frequencyMu.Lock() defer stats.frequencyMu.Unlock() // Signal the current waiter (if any), and prepare the channel for future diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index df25191631cf..142274140840 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -848,7 +848,7 @@ func NewStore( s.renewableLeasesSignal = make(chan struct{}) s.limiters.BulkIOWriteRate = rate.NewLimiter(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)), bulkIOWriteBurst) - bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func() { + bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { s.limiters.BulkIOWriteRate.SetLimit(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV))) }) s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( @@ -871,7 +871,7 @@ func NewStore( if exportCores < 1 { exportCores = 1 } - ExportRequestsLimit.SetOnChange(&cfg.Settings.SV, func() { + ExportRequestsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { limit := int(ExportRequestsLimit.Get(&cfg.Settings.SV)) if limit > exportCores { limit = exportCores @@ -881,13 +881,13 @@ func NewStore( s.limiters.ConcurrentAddSSTableRequests = limit.MakeConcurrentRequestLimiter( "addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)), ) - addSSTableRequestLimit.SetOnChange(&cfg.Settings.SV, func() { + addSSTableRequestLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { s.limiters.ConcurrentAddSSTableRequests.SetLimit(int(addSSTableRequestLimit.Get(&cfg.Settings.SV))) }) s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter( "rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)), ) - concurrentRangefeedItersLimit.SetOnChange(&cfg.Settings.SV, func() { + concurrentRangefeedItersLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { s.limiters.ConcurrentRangefeedIters.SetLimit( int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV))) }) @@ -899,7 +899,7 @@ func NewStore( "SystemConfigUpdateQueue", quotapool.Limit(queueAdditionOnSystemConfigUpdateRate.Get(&cfg.Settings.SV)), queueAdditionOnSystemConfigUpdateBurst.Get(&cfg.Settings.SV)) - updateSystemConfigUpdateQueueLimits := func() { + updateSystemConfigUpdateQueueLimits := func(ctx context.Context) { s.systemConfigUpdateQueueRateLimiter.UpdateLimit( quotapool.Limit(queueAdditionOnSystemConfigUpdateRate.Get(&cfg.Settings.SV)), queueAdditionOnSystemConfigUpdateBurst.Get(&cfg.Settings.SV)) @@ -1584,7 +1584,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { consistencyCheckRate.Get(&s.ClusterSettings().SV)*consistencyCheckRateBurstFactor, quotapool.WithMinimumWait(consistencyCheckRateMinWait)) - consistencyCheckRate.SetOnChange(&s.ClusterSettings().SV, func() { + consistencyCheckRate.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) { rate := consistencyCheckRate.Get(&s.ClusterSettings().SV) s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor) }) @@ -1826,7 +1826,7 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) { st := s.cfg.Settings confCh := make(chan struct{}, 1) - confChanged := func() { + confChanged := func(ctx context.Context) { select { case confCh <- struct{}{}: default: diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 307521adfc76..bce079ceeccd 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -113,7 +113,7 @@ func createTestStorePool( g := gossip.NewTest(1, rpcContext, server, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) mnl := newMockNodeLiveness(defaultNodeStatus) - TimeUntilStoreDead.Override(&st.SV, timeUntilStoreDeadValue) + TimeUntilStoreDead.Override(context.Background(), &st.SV, timeUntilStoreDeadValue) storePool := NewStorePool( log.AmbientContext{Tracer: st.Tracer}, st, diff --git a/pkg/kv/kvserver/tenantrate/factory.go b/pkg/kv/kvserver/tenantrate/factory.go index 378c1ddeb115..779611f56f01 100644 --- a/pkg/kv/kvserver/tenantrate/factory.go +++ b/pkg/kv/kvserver/tenantrate/factory.go @@ -11,6 +11,8 @@ package tenantrate import ( + "context" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/quotapool" @@ -56,7 +58,7 @@ func NewLimiterFactory(st *cluster.Settings, knobs *TestingKnobs) *LimiterFactor tenantMetrics: rl.metrics.tenantMetrics(roachpb.SystemTenantID), } for _, setting := range configSettings { - setting.SetOnChange(&st.SV, func() { + setting.SetOnChange(&st.SV, func(ctx context.Context) { config := ConfigFromSettings(st) rl.UpdateConfig(config) }) diff --git a/pkg/server/addjoin_test.go b/pkg/server/addjoin_test.go index c6d4e9fb4841..5eb0df28772f 100644 --- a/pkg/server/addjoin_test.go +++ b/pkg/server/addjoin_test.go @@ -34,7 +34,7 @@ func TestConsumeJoinToken(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() settings := cluster.MakeTestingClusterSettings() - sql.FeatureTLSAutoJoinEnabled.Override(&settings.SV, true) + sql.FeatureTLSAutoJoinEnabled.Override(ctx, &settings.SV, true) s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ Settings: settings, }) diff --git a/pkg/server/goroutinedumper/goroutinedumper_test.go b/pkg/server/goroutinedumper/goroutinedumper_test.go index 8a4aa66fa98d..f409887091b0 100644 --- a/pkg/server/goroutinedumper/goroutinedumper_test.go +++ b/pkg/server/goroutinedumper/goroutinedumper_test.go @@ -159,7 +159,7 @@ func TestHeuristic(t *testing.T) { ctx := context.Background() for _, v := range c.vals { currentTime = baseTime.Add(v.secs * time.Second) - numGoroutinesThreshold.Override(&st.SV, v.threshold) + numGoroutinesThreshold.Override(ctx, &st.SV, v.threshold) gd.MaybeDump(ctx, st, v.goroutines) } assert.Equal(t, c.expectedDumps, dumps) diff --git a/pkg/server/server.go b/pkg/server/server.go index 2ba695f17f0c..32035d4211ad 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -898,7 +898,7 @@ func (s *Server) startMonitoringForwardClockJumps(ctx context.Context) error { forwardJumpCheckEnabled := make(chan bool, 1) s.stopper.AddCloser(stop.CloserFn(func() { close(forwardJumpCheckEnabled) })) - forwardClockJumpCheckEnabled.SetOnChange(&s.st.SV, func() { + forwardClockJumpCheckEnabled.SetOnChange(&s.st.SV, func(context.Context) { forwardJumpCheckEnabled <- forwardClockJumpCheckEnabled.Get(&s.st.SV) }) @@ -1062,7 +1062,7 @@ func (s *Server) startPersistingHLCUpperBound( tickerFn func(d time.Duration) *time.Ticker, ) error { persistHLCUpperBoundIntervalCh := make(chan time.Duration, 1) - persistHLCUpperBoundInterval.SetOnChange(&s.st.SV, func() { + persistHLCUpperBoundInterval.SetOnChange(&s.st.SV, func(context.Context) { persistHLCUpperBoundIntervalCh <- persistHLCUpperBoundInterval.Get(&s.st.SV) }) @@ -1654,7 +1654,7 @@ func (s *Server) PreStart(ctx context.Context) error { } var graphiteOnce sync.Once - graphiteEndpoint.SetOnChange(&s.st.SV, func() { + graphiteEndpoint.SetOnChange(&s.st.SV, func(context.Context) { if graphiteEndpoint.Get(&s.st.SV) != "" { graphiteOnce.Do(func() { s.node.startGraphiteStatsExporter(s.st) diff --git a/pkg/server/server_import_ts_test.go b/pkg/server/server_import_ts_test.go index d765997bd290..095a3e0a2a13 100644 --- a/pkg/server/server_import_ts_test.go +++ b/pkg/server/server_import_ts_test.go @@ -56,7 +56,7 @@ func TestServerWithTimeseriesImport(t *testing.T) { // ingest the dump we just wrote. args := base.TestClusterArgs{} args.ServerArgs.Settings = cluster.MakeTestingClusterSettings() - ts.TimeseriesStorageEnabled.Override(&args.ServerArgs.Settings.SV, false) + ts.TimeseriesStorageEnabled.Override(ctx, &args.ServerArgs.Settings.SV, false) args.ServerArgs.Knobs.Server = &server.TestingKnobs{ ImportTimeseriesFile: path, } diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 2751b58865c0..6ece1dfc33f7 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -111,12 +111,12 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } else { log.Infof(ctx, "set cluster version to: %v", v) } - } else if err := u.Set(k, val, valType); err != nil { + } else if err := u.Set(ctx, k, val, valType); err != nil { log.Warningf(ctx, "failed to set setting %s to %s: %v", log.Safe(k), val, err) } }, rangefeed.WithInitialScan(func(ctx context.Context) { - u.ResetRemaining() + u.ResetRemaining(ctx) close(initialScanDone) }), rangefeed.WithOnInitialScanError(func( ctx context.Context, err error, diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index 9bd804e8764a..f223ce3de73d 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -95,8 +95,7 @@ func TestSettingWatcher(t *testing.T) { require.NoError(t, sw.Start(ctx)) // TestCluster randomizes the value of SeparatedIntentsEnabled, so set it to // the same as in fakeSettings for the subsequent equality check. - storage.SeparatedIntentsEnabled.Override( - &s0.ClusterSettings().SV, storage.SeparatedIntentsEnabled.Get(&fakeSettings.SV)) + storage.SeparatedIntentsEnabled.Override(ctx, &s0.ClusterSettings().SV, storage.SeparatedIntentsEnabled.Get(&fakeSettings.SV)) require.NoError(t, checkSettingsValuesMatch(s0.ClusterSettings(), fakeSettings)) for k, v := range toSet { tdb.Exec(t, "SET CLUSTER SETTING "+k+" = $1", v[1]) diff --git a/pkg/server/settingsworker.go b/pkg/server/settingsworker.go index 2bbaa593b0d4..e2d19fbbd021 100644 --- a/pkg/server/settingsworker.go +++ b/pkg/server/settingsworker.go @@ -44,7 +44,7 @@ func processSystemConfigKVs( } settingsKVs = append(settingsKVs, kv) - if err := u.Set(k, v, t); err != nil { + if err := u.Set(ctx, k, v, t); err != nil { log.Warningf(ctx, "setting %q to %q failed: %+v", k, v, err) } return nil @@ -56,7 +56,7 @@ this likely indicates the settings table structure or encoding has been altered; skipping settings updates`) } } - u.ResetRemaining() + u.ResetRemaining(ctx) return errors.Wrap(storeCachedSettingsKVs(ctx, eng, settingsKVs), "while storing settings kvs") } diff --git a/pkg/server/stats_test.go b/pkg/server/stats_test.go index 836efaae972d..c0f18bbfe802 100644 --- a/pkg/server/stats_test.go +++ b/pkg/server/stats_test.go @@ -98,7 +98,7 @@ func TestEnsureSQLStatsAreFlushedForTelemetry(t *testing.T) { params.Settings = cluster.MakeClusterSettings() // Set the SQL stat refresh rate very low so that SQL stats are continuously // flushed into the telemetry reporting stats pool. - sql.SQLStatReset.Override(¶ms.Settings.SV, 10*time.Millisecond) + sql.SQLStatReset.Override(ctx, ¶ms.Settings.SV, 10*time.Millisecond) s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 97d787d1b748..a03796ca9211 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -131,7 +131,7 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { enabledSeparated := rand.Intn(2) == 0 log.Infof(context.Background(), "test Config is randomly setting enabledSeparated: %t", enabledSeparated) - storage.SeparatedIntentsEnabled.Override(&st.SV, enabledSeparated) + storage.SeparatedIntentsEnabled.Override(context.Background(), &st.SV, enabledSeparated) } st.ExternalIODir = params.ExternalIODir cfg := makeTestConfig(st) @@ -266,7 +266,7 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { // For test servers, leave interleaved tables enabled by default. We'll remove // this when we remove interleaved tables altogether. - sql.InterleavedTablesEnabled.Override(&cfg.Settings.SV, true) + sql.InterleavedTablesEnabled.Override(context.Background(), &cfg.Settings.SV, true) return cfg } diff --git a/pkg/settings/bool.go b/pkg/settings/bool.go index ba5df4cbb76d..a25ea51e96bc 100644 --- a/pkg/settings/bool.go +++ b/pkg/settings/bool.go @@ -10,6 +10,8 @@ package settings +import "context" + // BoolSetting is the interface of a setting variable that will be // updated automatically when the corresponding cluster-wide setting // of type "bool" is updated. @@ -56,8 +58,8 @@ var _ = (*BoolSetting).Default // default value. // // For testing usage only. -func (b *BoolSetting) Override(sv *Values, v bool) { - b.set(sv, v) +func (b *BoolSetting) Override(ctx context.Context, sv *Values, v bool) { + b.set(ctx, sv, v) vInt := int64(0) if v { @@ -66,22 +68,22 @@ func (b *BoolSetting) Override(sv *Values, v bool) { sv.setDefaultOverrideInt64(b.slotIdx, vInt) } -func (b *BoolSetting) set(sv *Values, v bool) { +func (b *BoolSetting) set(ctx context.Context, sv *Values, v bool) { vInt := int64(0) if v { vInt = 1 } - sv.setInt64(b.slotIdx, vInt) + sv.setInt64(ctx, b.slotIdx, vInt) } -func (b *BoolSetting) setToDefault(sv *Values) { +func (b *BoolSetting) setToDefault(ctx context.Context, sv *Values) { // See if the default value was overridden. ok, val, _ := sv.getDefaultOverride(b.slotIdx) if ok { - b.set(sv, val > 0) + b.set(ctx, sv, val > 0) return } - b.set(sv, b.defaultValue) + b.set(ctx, sv, b.defaultValue) } // WithPublic sets public visibility and can be chained. diff --git a/pkg/settings/cluster/cluster_settings.go b/pkg/settings/cluster/cluster_settings.go index 13291e15d4d6..8eaac02047dd 100644 --- a/pkg/settings/cluster/cluster_settings.go +++ b/pkg/settings/cluster/cluster_settings.go @@ -123,7 +123,7 @@ func MakeClusterSettings() *Settings { sv := &s.SV s.Version = clusterversion.MakeVersionHandle(&s.SV) - sv.Init(s.Version) + sv.Init(context.TODO(), s.Version) s.Tracer = tracing.NewTracer() isActive := int32(0) // atomic @@ -140,7 +140,7 @@ func MakeClusterSettings() *Settings { } return false } - s.Tracer.Configure(sv) + s.Tracer.Configure(context.TODO(), sv) return s } @@ -171,10 +171,10 @@ func MakeTestingClusterSettingsWithVersions( sv := &s.SV s.Version = clusterversion.MakeVersionHandleWithOverride( &s.SV, binaryVersion, binaryMinSupportedVersion) - sv.Init(s.Version) + sv.Init(context.TODO(), s.Version) s.Tracer = tracing.NewTracer() - s.Tracer.Configure(sv) + s.Tracer.Configure(context.TODO(), sv) if initializeVersion { // Initialize cluster version to specified binaryVersion. diff --git a/pkg/settings/duration.go b/pkg/settings/duration.go index dd5cd6240058..0537e2fbd1a0 100644 --- a/pkg/settings/duration.go +++ b/pkg/settings/duration.go @@ -11,6 +11,7 @@ package settings import ( + "context" "time" "github.com/cockroachdb/errors" @@ -86,29 +87,29 @@ func (d *DurationSetting) Validate(v time.Duration) error { // default value. // // For testing usage only. -func (d *DurationSetting) Override(sv *Values, v time.Duration) { - sv.setInt64(d.slotIdx, int64(v)) +func (d *DurationSetting) Override(ctx context.Context, sv *Values, v time.Duration) { + sv.setInt64(ctx, d.slotIdx, int64(v)) sv.setDefaultOverrideInt64(d.slotIdx, int64(v)) } -func (d *DurationSetting) set(sv *Values, v time.Duration) error { +func (d *DurationSetting) set(ctx context.Context, sv *Values, v time.Duration) error { if err := d.Validate(v); err != nil { return err } - sv.setInt64(d.slotIdx, int64(v)) + sv.setInt64(ctx, d.slotIdx, int64(v)) return nil } -func (d *DurationSetting) setToDefault(sv *Values) { +func (d *DurationSetting) setToDefault(ctx context.Context, sv *Values) { // See if the default value was overridden. ok, val, _ := sv.getDefaultOverride(d.slotIdx) if ok { // As per the semantics of override, these values don't go through // validation. - _ = d.set(sv, time.Duration(val)) + _ = d.set(ctx, sv, time.Duration(val)) return } - if err := d.set(sv, d.defaultValue); err != nil { + if err := d.set(ctx, sv, d.defaultValue); err != nil { panic(err) } } diff --git a/pkg/settings/enum.go b/pkg/settings/enum.go index c63993accd23..6f42ffe7d2a1 100644 --- a/pkg/settings/enum.go +++ b/pkg/settings/enum.go @@ -12,6 +12,7 @@ package settings import ( "bytes" + "context" "fmt" "sort" "strconv" @@ -77,11 +78,11 @@ func (e *EnumSetting) GetAvailableValuesAsHint() string { return "Available values: " + strings.Join(vals, ", ") } -func (e *EnumSetting) set(sv *Values, k int64) error { +func (e *EnumSetting) set(ctx context.Context, sv *Values, k int64) error { if _, ok := e.enumValues[k]; !ok { return errors.Errorf("unrecognized value %d", k) } - return e.IntSetting.set(sv, k) + return e.IntSetting.set(ctx, sv, k) } func enumValuesToDesc(enumValues map[int64]string) string { diff --git a/pkg/settings/float.go b/pkg/settings/float.go index 43c918ee2ec9..8b124b026754 100644 --- a/pkg/settings/float.go +++ b/pkg/settings/float.go @@ -11,6 +11,7 @@ package settings import ( + "context" "math" "github.com/cockroachdb/errors" @@ -63,8 +64,8 @@ var _ = (*FloatSetting).Default // the default value. // // For testing usage only. -func (f *FloatSetting) Override(sv *Values, v float64) { - if err := f.set(sv, v); err != nil { +func (f *FloatSetting) Override(ctx context.Context, sv *Values, v float64) { + if err := f.set(ctx, sv, v); err != nil { panic(err) } sv.setDefaultOverrideInt64(f.slotIdx, int64(math.Float64bits(v))) @@ -80,24 +81,24 @@ func (f *FloatSetting) Validate(v float64) error { return nil } -func (f *FloatSetting) set(sv *Values, v float64) error { +func (f *FloatSetting) set(ctx context.Context, sv *Values, v float64) error { if err := f.Validate(v); err != nil { return err } - sv.setInt64(f.slotIdx, int64(math.Float64bits(v))) + sv.setInt64(ctx, f.slotIdx, int64(math.Float64bits(v))) return nil } -func (f *FloatSetting) setToDefault(sv *Values) { +func (f *FloatSetting) setToDefault(ctx context.Context, sv *Values) { // See if the default value was overridden. ok, val, _ := sv.getDefaultOverride(f.slotIdx) if ok { // As per the semantics of override, these values don't go through // validation. - _ = f.set(sv, math.Float64frombits(uint64((val)))) + _ = f.set(ctx, sv, math.Float64frombits(uint64((val)))) return } - if err := f.set(sv, f.defaultValue); err != nil { + if err := f.set(ctx, sv, f.defaultValue); err != nil { panic(err) } } diff --git a/pkg/settings/int.go b/pkg/settings/int.go index 16e3143840a2..d2e0d606f5c7 100644 --- a/pkg/settings/int.go +++ b/pkg/settings/int.go @@ -10,7 +10,11 @@ package settings -import "github.com/cockroachdb/errors" +import ( + "context" + + "github.com/cockroachdb/errors" +) // IntSetting is the interface of a setting variable that will be // updated automatically when the corresponding cluster-wide setting @@ -69,29 +73,29 @@ func (i *IntSetting) Validate(v int64) error { // default value. // // For testing usage only. -func (i *IntSetting) Override(sv *Values, v int64) { - sv.setInt64(i.slotIdx, v) +func (i *IntSetting) Override(ctx context.Context, sv *Values, v int64) { + sv.setInt64(ctx, i.slotIdx, v) sv.setDefaultOverrideInt64(i.slotIdx, v) } -func (i *IntSetting) set(sv *Values, v int64) error { +func (i *IntSetting) set(ctx context.Context, sv *Values, v int64) error { if err := i.Validate(v); err != nil { return err } - sv.setInt64(i.slotIdx, v) + sv.setInt64(ctx, i.slotIdx, v) return nil } -func (i *IntSetting) setToDefault(sv *Values) { +func (i *IntSetting) setToDefault(ctx context.Context, sv *Values) { // See if the default value was overridden. ok, val, _ := sv.getDefaultOverride(i.slotIdx) if ok { // As per the semantics of override, these values don't go through // validation. - _ = i.set(sv, val) + _ = i.set(ctx, sv, val) return } - if err := i.set(sv, i.defaultValue); err != nil { + if err := i.set(ctx, sv, i.defaultValue); err != nil { panic(err) } } diff --git a/pkg/settings/setting.go b/pkg/settings/setting.go index 1aadc85df6a7..d780cbe3f9fe 100644 --- a/pkg/settings/setting.go +++ b/pkg/settings/setting.go @@ -11,6 +11,7 @@ package settings import ( + "context" "fmt" "strings" "sync/atomic" @@ -42,7 +43,7 @@ type Values struct { syncutil.Mutex // NB: any in place modification to individual slices must also hold the // lock, e.g. if we ever add RemoveOnChange or something. - onChange [MaxSettings][]func() + onChange [MaxSettings][]func(ctx context.Context) } // opaque is an arbitrary object that can be set by a higher layer to make it // accessible from certain callbacks (like state machine transformers). @@ -92,10 +93,10 @@ var TestOpaque interface{} = testOpaqueType{} // variables to their defaults. // // The opaque argument can be retrieved later via Opaque(). -func (sv *Values) Init(opaque interface{}) { +func (sv *Values) Init(ctx context.Context, opaque interface{}) { sv.opaque = opaque for _, s := range registry { - s.setToDefault(sv) + s.setToDefault(ctx, sv) } } @@ -104,12 +105,12 @@ func (sv *Values) Opaque() interface{} { return sv.opaque } -func (sv *Values) settingChanged(slotIdx int) { +func (sv *Values) settingChanged(ctx context.Context, slotIdx int) { sv.changeMu.Lock() funcs := sv.changeMu.onChange[slotIdx-1] sv.changeMu.Unlock() for _, fn := range funcs { - fn() + fn(ctx) } } @@ -121,9 +122,9 @@ func (c *valuesContainer) getGeneric(slotIdx int) interface{} { return c.genericVals[slotIdx-1].Load() } -func (sv *Values) setInt64(slotIdx int, newVal int64) { +func (sv *Values) setInt64(ctx context.Context, slotIdx int, newVal int64) { if sv.container.setInt64Val(slotIdx-1, newVal) { - sv.settingChanged(slotIdx) + sv.settingChanged(ctx, slotIdx) } } @@ -161,9 +162,9 @@ func (sv *Values) getDefaultOverride(slotIdx int) (bool, int64, *atomic.Value) { &sv.overridesMu.defaultOverrides.genericVals[slotIdx] } -func (sv *Values) setGeneric(slotIdx int, newVal interface{}) { +func (sv *Values) setGeneric(ctx context.Context, slotIdx int, newVal interface{}) { sv.container.setGenericVal(slotIdx-1, newVal) - sv.settingChanged(slotIdx) + sv.settingChanged(ctx, slotIdx) } func (sv *Values) getInt64(slotIdx int) int64 { @@ -177,7 +178,7 @@ func (sv *Values) getGeneric(slotIdx int) interface{} { // setOnChange installs a callback to be called when a setting's value changes. // `fn` should avoid doing long-running or blocking work as it is called on the // goroutine which handles all settings updates. -func (sv *Values) setOnChange(slotIdx int, fn func()) { +func (sv *Values) setOnChange(slotIdx int, fn func(ctx context.Context)) { sv.changeMu.Lock() sv.changeMu.onChange[slotIdx-1] = append(sv.changeMu.onChange[slotIdx-1], fn) sv.changeMu.Unlock() @@ -219,7 +220,7 @@ type WritableSetting interface { // SetOnChange installs a callback to be called when a setting's value // changes. `fn` should avoid doing long-running or blocking work as it is // called on the goroutine which handles all settings updates. - SetOnChange(sv *Values, fn func()) + SetOnChange(sv *Values, fn func(ctx context.Context)) // ErrorHint returns a hint message to be displayed to the user when there's // an error. ErrorHint() (bool, string) @@ -229,7 +230,7 @@ type extendedSetting interface { WritableSetting isRetired() bool - setToDefault(sv *Values) + setToDefault(ctx context.Context, sv *Values) setDescription(desc string) setSlotIdx(slotIdx int) getSlotIdx() int @@ -343,14 +344,14 @@ func (i *common) SetRetired() { // SetOnChange installs a callback to be called when a setting's value changes. // `fn` should avoid doing long-running or blocking work as it is called on the // goroutine which handles all settings updates. -func (i *common) SetOnChange(sv *Values, fn func()) { +func (i *common) SetOnChange(sv *Values, fn func(ctx context.Context)) { sv.setOnChange(i.slotIdx, fn) } type numericSetting interface { Setting Validate(i int64) error - set(sv *Values, i int64) error + set(ctx context.Context, sv *Values, i int64) error } // TestingIsReportable is used in testing for reportability. diff --git a/pkg/settings/settings_test.go b/pkg/settings/settings_test.go index 24674d160c34..f539634e7cd9 100644 --- a/pkg/settings/settings_test.go +++ b/pkg/settings/settings_test.go @@ -192,37 +192,39 @@ var iVal = settings.RegisterIntSetting( }) func TestValidation(t *testing.T) { + ctx := context.Background() sv := &settings.Values{} - sv.Init(settings.TestOpaque) + sv.Init(ctx, settings.TestOpaque) u := settings.NewUpdater(sv) t.Run("d_with_maximum", func(t *testing.T) { - err := u.Set("d_with_maximum", "1h", "d") + err := u.Set(ctx, "d_with_maximum", "1h", "d") require.NoError(t, err) - err = u.Set("d_with_maximum", "0h", "d") + err = u.Set(ctx, "d_with_maximum", "0h", "d") require.NoError(t, err) - err = u.Set("d_with_maximum", "30m", "d") + err = u.Set(ctx, "d_with_maximum", "30m", "d") require.NoError(t, err) - err = u.Set("d_with_maximum", "-1m", "d") + err = u.Set(ctx, "d_with_maximum", "-1m", "d") require.Error(t, err) - err = u.Set("d_with_maximum", "1h1s", "d") + err = u.Set(ctx, "d_with_maximum", "1h1s", "d") require.Error(t, err) }) } func TestCache(t *testing.T) { + ctx := context.Background() sv := &settings.Values{} - sv.Init(settings.TestOpaque) + sv.Init(ctx, settings.TestOpaque) - boolTA.SetOnChange(sv, func() { changes.boolTA++ }) - strFooA.SetOnChange(sv, func() { changes.strFooA++ }) - i1A.SetOnChange(sv, func() { changes.i1A++ }) - fA.SetOnChange(sv, func() { changes.fA++ }) - dA.SetOnChange(sv, func() { changes.dA++ }) - duA.SetOnChange(sv, func() { changes.duA++ }) - eA.SetOnChange(sv, func() { changes.eA++ }) - byteSize.SetOnChange(sv, func() { changes.byteSize++ }) + boolTA.SetOnChange(sv, func(context.Context) { changes.boolTA++ }) + strFooA.SetOnChange(sv, func(context.Context) { changes.strFooA++ }) + i1A.SetOnChange(sv, func(context.Context) { changes.i1A++ }) + fA.SetOnChange(sv, func(context.Context) { changes.fA++ }) + dA.SetOnChange(sv, func(context.Context) { changes.dA++ }) + duA.SetOnChange(sv, func(context.Context) { changes.duA++ }) + eA.SetOnChange(sv, func(context.Context) { changes.eA++ }) + byteSize.SetOnChange(sv, func(context.Context) { changes.byteSize++ }) t.Run("VersionSetting", func(t *testing.T) { ctx := context.Background() @@ -271,7 +273,7 @@ func TestCache(t *testing.T) { if err := setDummyVersion(newDummyV, mB, sv); err != nil { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) if exp, act := "&{default XX}", mB.String(sv); exp != act { t.Fatalf("wanted %q, got %q", exp, act) } @@ -368,46 +370,46 @@ func TestCache(t *testing.T) { if expected, actual := 0, changes.boolTA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("bool.t", settings.EncodeBool(false), "b"); err != nil { + if err := u.Set(ctx, "bool.t", settings.EncodeBool(false), "b"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.boolTA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("bool.f", settings.EncodeBool(true), "b"); err != nil { + if err := u.Set(ctx, "bool.f", settings.EncodeBool(true), "b"); err != nil { t.Fatal(err) } if expected, actual := 0, changes.strFooA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("str.foo", "baz", "s"); err != nil { + if err := u.Set(ctx, "str.foo", "baz", "s"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.strFooA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("str.val", "valid", "s"); err != nil { + if err := u.Set(ctx, "str.val", "valid", "s"); err != nil { t.Fatal(err) } - if err := u.Set("i.2", settings.EncodeInt(3), "i"); err != nil { + if err := u.Set(ctx, "i.2", settings.EncodeInt(3), "i"); err != nil { t.Fatal(err) } if expected, actual := 0, changes.fA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("f", settings.EncodeFloat(3.1), "f"); err != nil { + if err := u.Set(ctx, "f", settings.EncodeFloat(3.1), "f"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.fA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("fVal", settings.EncodeFloat(3.1), "f"); err != nil { + if err := u.Set(ctx, "fVal", settings.EncodeFloat(3.1), "f"); err != nil { t.Fatal(err) } if expected, actual := 0, changes.dA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("d", settings.EncodeDuration(2*time.Hour), "d"); err != nil { + if err := u.Set(ctx, "d", settings.EncodeDuration(2*time.Hour), "d"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.dA; expected != actual { @@ -416,45 +418,45 @@ func TestCache(t *testing.T) { if expected, actual := 0, changes.duA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("d_with_explicit_unit", settings.EncodeDuration(2*time.Hour), "d"); err != nil { + if err := u.Set(ctx, "d_with_explicit_unit", settings.EncodeDuration(2*time.Hour), "d"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.duA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("dVal", settings.EncodeDuration(2*time.Hour), "d"); err != nil { + if err := u.Set(ctx, "dVal", settings.EncodeDuration(2*time.Hour), "d"); err != nil { t.Fatal(err) } if expected, actual := 0, changes.byteSize; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("zzz", settings.EncodeInt(mb*5), "z"); err != nil { + if err := u.Set(ctx, "zzz", settings.EncodeInt(mb*5), "z"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.byteSize; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("byteSize.Val", settings.EncodeInt(mb*5), "z"); err != nil { + if err := u.Set(ctx, "byteSize.Val", settings.EncodeInt(mb*5), "z"); err != nil { t.Fatal(err) } if expected, actual := 0, changes.eA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("e", settings.EncodeInt(2), "e"); err != nil { + if err := u.Set(ctx, "e", settings.EncodeInt(2), "e"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.eA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } if expected, err := "strconv.Atoi: parsing \"notAValidValue\": invalid syntax", - u.Set("e", "notAValidValue", "e"); !testutils.IsError(err, expected) { + u.Set(ctx, "e", "notAValidValue", "e"); !testutils.IsError(err, expected) { t.Fatalf("expected '%s' != actual error '%s'", expected, err) } defaultDummyV := dummyVersion{msg1: "default", growsbyone: "AB"} if err := setDummyVersion(defaultDummyV, mA, sv); err != nil { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) if expected, actual := false, boolTA.Get(sv); expected != actual { t.Fatalf("expected %v, got %v", expected, actual) @@ -508,25 +510,25 @@ func TestCache(t *testing.T) { t.Run("any setting not included in an Updater reverts to default", func(t *testing.T) { { u := settings.NewUpdater(sv) - if err := u.Set("bool.f", settings.EncodeBool(true), "b"); err != nil { + if err := u.Set(ctx, "bool.f", settings.EncodeBool(true), "b"); err != nil { t.Fatal(err) } if expected, actual := 0, changes.i1A; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("i.1", settings.EncodeInt(1), "i"); err != nil { + if err := u.Set(ctx, "i.1", settings.EncodeInt(1), "i"); err != nil { t.Fatal(err) } if expected, actual := 1, changes.i1A; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set("i.2", settings.EncodeInt(7), "i"); err != nil { + if err := u.Set(ctx, "i.2", settings.EncodeInt(7), "i"); err != nil { t.Fatal(err) } - if err := u.Set("i.Val", settings.EncodeInt(1), "i"); err != nil { + if err := u.Set(ctx, "i.Val", settings.EncodeInt(1), "i"); err != nil { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := true, boolFA.Get(sv); expected != actual { @@ -534,7 +536,7 @@ func TestCache(t *testing.T) { } // If the updater doesn't have a key, e.g. if the setting has been deleted, // Doneing it from the cache. - settings.NewUpdater(sv).ResetRemaining() + settings.NewUpdater(sv).ResetRemaining(ctx) if expected, actual := 2, changes.boolTA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) @@ -556,10 +558,10 @@ func TestCache(t *testing.T) { t.Run("an invalid update to a given setting preserves its previously set value", func(t *testing.T) { { u := settings.NewUpdater(sv) - if err := u.Set("i.2", settings.EncodeInt(9), "i"); err != nil { + if err := u.Set(ctx, "i.2", settings.EncodeInt(9), "i"); err != nil { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } before := i2A.Get(sv) @@ -567,12 +569,12 @@ func TestCache(t *testing.T) { // value. { u := settings.NewUpdater(sv) - if err := u.Set("i.2", settings.EncodeBool(false), "b"); !testutils.IsError(err, + if err := u.Set(ctx, "i.2", settings.EncodeBool(false), "b"); !testutils.IsError(err, "setting 'i.2' defined as type i, not b", ) { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := before, i2A.Get(sv); expected != actual { @@ -583,12 +585,12 @@ func TestCache(t *testing.T) { // current value. { u := settings.NewUpdater(sv) - if err := u.Set("i.2", settings.EncodeBool(false), "i"); !testutils.IsError(err, + if err := u.Set(ctx, "i.2", settings.EncodeBool(false), "i"); !testutils.IsError(err, "strconv.Atoi: parsing \"false\": invalid syntax", ) { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := before, i2A.Get(sv); expected != actual { @@ -600,12 +602,12 @@ func TestCache(t *testing.T) { beforestrVal := strVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set("str.val", "abc2def", "s"); !testutils.IsError(err, + if err := u.Set(ctx, "str.val", "abc2def", "s"); !testutils.IsError(err, "not all runes of abc2def are letters: 2", ) { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := beforestrVal, strVal.Get(sv); expected != actual { t.Fatalf("expected %v, got %v", expected, actual) @@ -614,12 +616,12 @@ func TestCache(t *testing.T) { beforeDVal := dVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set("dVal", settings.EncodeDuration(-time.Hour), "d"); !testutils.IsError(err, + if err := u.Set(ctx, "dVal", settings.EncodeDuration(-time.Hour), "d"); !testutils.IsError(err, "cannot be set to a negative duration: -1h0m0s", ) { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := beforeDVal, dVal.Get(sv); expected != actual { t.Fatalf("expected %v, got %v", expected, actual) @@ -628,12 +630,12 @@ func TestCache(t *testing.T) { beforeByteSizeVal := byteSizeVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set("byteSize.Val", settings.EncodeInt(-mb), "z"); !testutils.IsError(err, + if err := u.Set(ctx, "byteSize.Val", settings.EncodeInt(-mb), "z"); !testutils.IsError(err, "bytesize cannot be negative", ) { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := beforeByteSizeVal, byteSizeVal.Get(sv); expected != actual { t.Fatalf("expected %v, got %v", expected, actual) @@ -642,12 +644,12 @@ func TestCache(t *testing.T) { beforeFVal := fVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set("fVal", settings.EncodeFloat(-1.1), "f"); !testutils.IsError(err, + if err := u.Set(ctx, "fVal", settings.EncodeFloat(-1.1), "f"); !testutils.IsError(err, "cannot set to a negative value: -1.1", ) { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := beforeFVal, fVal.Get(sv); expected != actual { t.Fatalf("expected %v, got %v", expected, actual) @@ -656,12 +658,12 @@ func TestCache(t *testing.T) { beforeIVal := iVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set("i.Val", settings.EncodeInt(-1), "i"); !testutils.IsError(err, + if err := u.Set(ctx, "i.Val", settings.EncodeInt(-1), "i"); !testutils.IsError(err, "int cannot be negative", ) { t.Fatal(err) } - u.ResetRemaining() + u.ResetRemaining(ctx) } if expected, actual := beforeIVal, iVal.Get(sv); expected != actual { t.Fatalf("expected %v, got %v", expected, actual) @@ -680,6 +682,7 @@ func TestIsReportable(t *testing.T) { } func TestOnChangeWithMaxSettings(t *testing.T) { + ctx := context.Background() // Register MaxSettings settings to ensure that no errors occur. maxName, err := batchRegisterSettings(t, t.Name(), settings.MaxSettings-settings.NumRegisteredSettings()) if err != nil { @@ -688,7 +691,7 @@ func TestOnChangeWithMaxSettings(t *testing.T) { // Change the max slotIdx setting to ensure that no errors occur. sv := &settings.Values{} - sv.Init(settings.TestOpaque) + sv.Init(ctx, settings.TestOpaque) var changes int s, ok := settings.Lookup(maxName, settings.LookupForLocalAccess) if !ok { @@ -698,10 +701,10 @@ func TestOnChangeWithMaxSettings(t *testing.T) { if !ok { t.Fatalf("expected int setting, got %T", s) } - intSetting.SetOnChange(sv, func() { changes++ }) + intSetting.SetOnChange(sv, func(ctx context.Context) { changes++ }) u := settings.NewUpdater(sv) - if err := u.Set(maxName, settings.EncodeInt(9), "i"); err != nil { + if err := u.Set(ctx, maxName, settings.EncodeInt(9), "i"); err != nil { t.Fatal(err) } @@ -747,36 +750,37 @@ var overrideDuration = settings.RegisterDurationSetting("override.duration", "de var overrideFloat = settings.RegisterFloatSetting("override.float", "desc", 1.0) func TestOverride(t *testing.T) { + ctx := context.Background() sv := &settings.Values{} - sv.Init(settings.TestOpaque) + sv.Init(ctx, settings.TestOpaque) // Test override for bool setting. require.Equal(t, true, overrideBool.Get(sv)) - overrideBool.Override(sv, false) + overrideBool.Override(ctx, sv, false) require.Equal(t, false, overrideBool.Get(sv)) u := settings.NewUpdater(sv) - u.ResetRemaining() + u.ResetRemaining(ctx) require.Equal(t, false, overrideBool.Get(sv)) // Test override for int setting. require.Equal(t, int64(0), overrideInt.Get(sv)) - overrideInt.Override(sv, 42) + overrideInt.Override(ctx, sv, 42) require.Equal(t, int64(42), overrideInt.Get(sv)) - u.ResetRemaining() + u.ResetRemaining(ctx) require.Equal(t, int64(42), overrideInt.Get(sv)) // Test override for duration setting. require.Equal(t, time.Second, overrideDuration.Get(sv)) - overrideDuration.Override(sv, 42*time.Second) + overrideDuration.Override(ctx, sv, 42*time.Second) require.Equal(t, 42*time.Second, overrideDuration.Get(sv)) - u.ResetRemaining() + u.ResetRemaining(ctx) require.Equal(t, 42*time.Second, overrideDuration.Get(sv)) // Test override for float setting. require.Equal(t, 1.0, overrideFloat.Get(sv)) - overrideFloat.Override(sv, 42.0) + overrideFloat.Override(ctx, sv, 42.0) require.Equal(t, 42.0, overrideFloat.Get(sv)) - u.ResetRemaining() + u.ResetRemaining(ctx) require.Equal(t, 42.0, overrideFloat.Get(sv)) } @@ -788,6 +792,6 @@ func setDummyVersion(dv dummyVersion, vs *settings.VersionSetting, sv *settings. if err != nil { return err } - vs.SetInternal(sv, encoded) + vs.SetInternal(context.Background(), sv, encoded) return nil } diff --git a/pkg/settings/string.go b/pkg/settings/string.go index 7ab80c17ced2..0e155f7e543b 100644 --- a/pkg/settings/string.go +++ b/pkg/settings/string.go @@ -10,7 +10,11 @@ package settings -import "github.com/cockroachdb/errors" +import ( + "context" + + "github.com/cockroachdb/errors" +) // StringSetting is the interface of a setting variable that will be // updated automatically when the corresponding cluster-wide setting @@ -71,22 +75,22 @@ func (s *StringSetting) Validate(sv *Values, v string) error { // Override sets the setting to the given value, assuming // it passes validation. -func (s *StringSetting) Override(sv *Values, v string) { - _ = s.set(sv, v) +func (s *StringSetting) Override(ctx context.Context, sv *Values, v string) { + _ = s.set(ctx, sv, v) } -func (s *StringSetting) set(sv *Values, v string) error { +func (s *StringSetting) set(ctx context.Context, sv *Values, v string) error { if err := s.Validate(sv, v); err != nil { return err } if s.Get(sv) != v { - sv.setGeneric(s.slotIdx, v) + sv.setGeneric(ctx, s.slotIdx, v) } return nil } -func (s *StringSetting) setToDefault(sv *Values) { - if err := s.set(sv, s.defaultValue); err != nil { +func (s *StringSetting) setToDefault(ctx context.Context, sv *Values) { + if err := s.set(ctx, sv, s.defaultValue); err != nil { panic(err) } } diff --git a/pkg/settings/updater.go b/pkg/settings/updater.go index c5eeeba5ca5e..04232cf8318f 100644 --- a/pkg/settings/updater.go +++ b/pkg/settings/updater.go @@ -11,6 +11,7 @@ package settings import ( + "context" "strconv" "time" @@ -49,18 +50,18 @@ type updater struct { // wrapped atomic settings values as we go and note which settings were updated, // then set the rest to default in ResetRemaining(). type Updater interface { - Set(k, rawValue, valType string) error - ResetRemaining() + Set(ctx context.Context, k, rawValue, valType string) error + ResetRemaining(ctx context.Context) } // A NoopUpdater ignores all updates. type NoopUpdater struct{} // Set implements Updater. It is a no-op. -func (u NoopUpdater) Set(_, _, _ string) error { return nil } +func (u NoopUpdater) Set(ctx context.Context, k, rawValue, valType string) error { return nil } // ResetRemaining implements Updater. It is a no-op. -func (u NoopUpdater) ResetRemaining() {} +func (u NoopUpdater) ResetRemaining(context.Context) {} // NewUpdater makes an Updater. func NewUpdater(sv *Values) Updater { @@ -71,7 +72,7 @@ func NewUpdater(sv *Values) Updater { } // Set attempts to parse and update a setting and notes that it was updated. -func (u updater) Set(key, rawValue string, vt string) error { +func (u updater) Set(ctx context.Context, key, rawValue string, vt string) error { d, ok := registry[key] if !ok { if _, ok := retiredSettings[key]; ok { @@ -89,38 +90,38 @@ func (u updater) Set(key, rawValue string, vt string) error { switch setting := d.(type) { case *StringSetting: - return setting.set(u.sv, rawValue) + return setting.set(ctx, u.sv, rawValue) case *BoolSetting: b, err := strconv.ParseBool(rawValue) if err != nil { return err } - setting.set(u.sv, b) + setting.set(ctx, u.sv, b) return nil case numericSetting: // includes *EnumSetting i, err := strconv.Atoi(rawValue) if err != nil { return err } - return setting.set(u.sv, int64(i)) + return setting.set(ctx, u.sv, int64(i)) case *FloatSetting: f, err := strconv.ParseFloat(rawValue, 64) if err != nil { return err } - return setting.set(u.sv, f) + return setting.set(ctx, u.sv, f) case *DurationSetting: d, err := time.ParseDuration(rawValue) if err != nil { return err } - return setting.set(u.sv, d) + return setting.set(ctx, u.sv, d) case *DurationSettingWithExplicitUnit: d, err := time.ParseDuration(rawValue) if err != nil { return err } - return setting.set(u.sv, d) + return setting.set(ctx, u.sv, d) case *VersionSetting: // We intentionally avoid updating the setting through this code path. // The specific setting backed by VersionSetting is the cluster version @@ -133,10 +134,10 @@ func (u updater) Set(key, rawValue string, vt string) error { } // ResetRemaining sets all settings not updated by the updater to their default values. -func (u updater) ResetRemaining() { +func (u updater) ResetRemaining(ctx context.Context) { for k, v := range registry { if _, ok := u.m[k]; !ok { - v.setToDefault(u.sv) + v.setToDefault(ctx, u.sv) } } } diff --git a/pkg/settings/version.go b/pkg/settings/version.go index 33e5824458cc..0a0c48d3ddf5 100644 --- a/pkg/settings/version.go +++ b/pkg/settings/version.go @@ -151,8 +151,8 @@ func (v *VersionSetting) GetInternal(sv *Values) interface{} { } // SetInternal updates the setting's value in the provided Values container. -func (v *VersionSetting) SetInternal(sv *Values, newVal interface{}) { - sv.setGeneric(v.getSlotIdx(), newVal) +func (v *VersionSetting) SetInternal(ctx context.Context, sv *Values, newVal interface{}) { + sv.setGeneric(ctx, v.getSlotIdx(), newVal) } // setToDefault is part of the extendingSetting interface. This is a no-op for @@ -161,7 +161,7 @@ func (v *VersionSetting) SetInternal(sv *Values, newVal interface{}) { // // TODO(irfansharif): Is this true? Shouldn't the default here just the the // version we initialize with? -func (v *VersionSetting) setToDefault(_ *Values) {} +func (v *VersionSetting) setToDefault(ctx context.Context, sv *Values) {} // RegisterVersionSetting adds the provided version setting to the global // registry. @@ -185,6 +185,6 @@ func TestingRegisterVersionSetting(key, desc string, impl VersionSettingImpl) *V // it out of the settings package before long (see TODO on the type itself). In // our current usage we don't rely on attaching pre-change triggers, so let's // not add it needlessly. -func (v *VersionSetting) SetOnChange(_ *Values, _ func()) { +func (v *VersionSetting) SetOnChange(sv *Values, fn func(ctx context.Context)) { panic("unimplemented") } diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 7deb83bea407..9953e0f7f01e 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -63,6 +63,7 @@ func (t *interceptingTransport) SendNext( func TestAmbiguousCommit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() if mutations.MaxBatchSize(false /* forceProductionMaxBatchSize */) == 1 { // This test relies on the fact that the mutation batch consisting of a @@ -157,7 +158,7 @@ func TestAmbiguousCommit(t *testing.T) { for _, server := range tc.Servers { st := server.ClusterSettings() st.Manual.Store(true) - sql.DistSQLClusterExecMode.Override(&st.SV, int64(sessiondata.DistSQLOff)) + sql.DistSQLClusterExecMode.Override(ctx, &st.SV, int64(sessiondata.DistSQLOff)) } sqlDB := tc.Conns[0] diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go index 8c76b57966d1..df5db90815a7 100644 --- a/pkg/sql/catalog/lease/lease_internal_test.go +++ b/pkg/sql/catalog/lease/lease_internal_test.go @@ -924,7 +924,7 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { // monotonically increasing expiration. This prevents two leases // from having the same expiration due to randomness, as the // leases are checked for having a different expiration. - LeaseJitterFraction.Override(&serverArgs.SV, 0) + LeaseJitterFraction.Override(ctx, &serverArgs.SV, 0) s, _, _ := serverutils.StartServer( t, serverArgs) diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 3a9d4c550256..fe54f94e7b6d 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -343,10 +343,11 @@ func createTestServerParams() base.TestServerArgs { func TestLeaseManagerReacquire(testingT *testing.T) { defer leaktest.AfterTest(testingT)() params := createTestServerParams() + ctx := context.Background() // Set the lease duration such that the next lease acquisition will // require the lease to be reacquired. - lease.LeaseDuration.Override(¶ms.SV, 0) + lease.LeaseDuration.Override(ctx, ¶ms.SV, 0) removalTracker := lease.NewLeaseRemovalTracker() params.Knobs = base.TestingKnobs{ @@ -1263,6 +1264,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // lease is renewed. func TestLeaseRenewedAutomatically(testingT *testing.T) { defer leaktest.AfterTest(testingT)() + ctx := context.Background() var testAcquiredCount int32 var testAcquisitionBlockCount int32 @@ -1289,13 +1291,12 @@ func TestLeaseRenewedAutomatically(testingT *testing.T) { } // The lease jitter is set to ensure newer leases have higher // expiration timestamps. - lease.LeaseJitterFraction.Override(¶ms.SV, 0) + lease.LeaseJitterFraction.Override(ctx, ¶ms.SV, 0) // The renewal timeout is set to be the duration, so background // renewal should begin immediately after accessing a lease. - lease.LeaseRenewalDuration.Override(¶ms.SV, + lease.LeaseRenewalDuration.Override(ctx, ¶ms.SV, lease.LeaseDuration.Get(¶ms.SV)) - ctx := context.Background() t := newLeaseTest(testingT, params) defer t.cleanup() @@ -1712,6 +1713,7 @@ CREATE TABLE t.test0 (k CHAR PRIMARY KEY, v CHAR); // TODO(vivek): remove once epoch based leases is implemented. func TestLeaseRenewedPeriodically(testingT *testing.T) { defer leaktest.AfterTest(testingT)() + ctx := context.Background() var mu syncutil.Mutex releasedIDs := make(map[descpb.ID]struct{}) @@ -1750,14 +1752,13 @@ func TestLeaseRenewedPeriodically(testingT *testing.T) { // The lease jitter is set to ensure newer leases have higher // expiration timestamps. - lease.LeaseJitterFraction.Override(¶ms.SV, 0) + lease.LeaseJitterFraction.Override(ctx, ¶ms.SV, 0) // Lease duration to something small. - lease.LeaseDuration.Override(¶ms.SV, 50*time.Millisecond) + lease.LeaseDuration.Override(ctx, ¶ms.SV, 50*time.Millisecond) // Renewal timeout to 0 saying that the lease will get renewed only // after the lease expires when a request requests the descriptor. - lease.LeaseRenewalDuration.Override(¶ms.SV, 0) + lease.LeaseRenewalDuration.Override(ctx, ¶ms.SV, 0) - ctx := context.Background() t := newLeaseTest(testingT, params) defer t.cleanup() @@ -2866,7 +2867,7 @@ func TestLeaseTxnDeadlineExtension(t *testing.T) { params := createTestServerParams() // Set the lease duration such that the next lease acquisition will // require the lease to be reacquired. - lease.LeaseDuration.Override(¶ms.SV, 0) + lease.LeaseDuration.Override(ctx, ¶ms.SV, 0) params.Knobs.Store = &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { filterMu.Lock() diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index 113cf06088c2..1d9c85e6073d 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -60,7 +60,7 @@ func TestExternalHashAggregator(t *testing.T) { rng, _ := randutil.NewPseudoRand() numForcedRepartitions := rng.Intn(5) for _, diskSpillingEnabled := range []bool{true, false} { - HashAggregationDiskSpillingEnabled.Override(&flowCtx.Cfg.Settings.SV, diskSpillingEnabled) + HashAggregationDiskSpillingEnabled.Override(ctx, &flowCtx.Cfg.Settings.SV, diskSpillingEnabled) // Test the case in which the default memory is used as well as the case // in which the hash aggregator spills to disk. for _, spillForced := range []bool{false, true} { diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 82caa3bdc90f..7d40f926ed5b 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -87,7 +87,7 @@ func NewServer(ctx context.Context, cfg execinfra.ServerConfig) *ServerImpl { } ds.memMonitor.Start(ctx, cfg.ParentMemoryMonitor, mon.BoundAccount{}) - colexec.HashAggregationDiskSpillingEnabled.SetOnChange(&cfg.Settings.SV, func() { + colexec.HashAggregationDiskSpillingEnabled.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { if !colexec.HashAggregationDiskSpillingEnabled.Get(&cfg.Settings.SV) { telemetry.Inc(sqltelemetry.HashAggregationDiskSpillingDisabled) } diff --git a/pkg/sql/flowinfra/flow_scheduler.go b/pkg/sql/flowinfra/flow_scheduler.go index 5bd2e090f225..2617e75356e0 100644 --- a/pkg/sql/flowinfra/flow_scheduler.go +++ b/pkg/sql/flowinfra/flow_scheduler.go @@ -77,7 +77,7 @@ func NewFlowScheduler( } fs.mu.queue = list.New() fs.atomics.maxRunningFlows = int32(settingMaxRunningFlows.Get(&settings.SV)) - settingMaxRunningFlows.SetOnChange(&settings.SV, func() { + settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) { atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(settingMaxRunningFlows.Get(&settings.SV))) }) return fs diff --git a/pkg/sql/instrumentation_test.go b/pkg/sql/instrumentation_test.go index 81d0b91f8d9e..d44499a35411 100644 --- a/pkg/sql/instrumentation_test.go +++ b/pkg/sql/instrumentation_test.go @@ -53,7 +53,7 @@ func TestSampledStatsCollection(t *testing.T) { if enable { v = 1 } - collectTxnStatsSampleRate.Override(&st.SV, v) + collectTxnStatsSampleRate.Override(ctx, &st.SV, v) } type queryer interface { diff --git a/pkg/sql/join_token_test.go b/pkg/sql/join_token_test.go index 7c88bfafaf7e..0caedee3c7ce 100644 --- a/pkg/sql/join_token_test.go +++ b/pkg/sql/join_token_test.go @@ -26,9 +26,10 @@ import ( func TestCreateJoinToken(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() settings := cluster.MakeTestingClusterSettings() - FeatureTLSAutoJoinEnabled.Override(&settings.SV, true) + FeatureTLSAutoJoinEnabled.Override(ctx, &settings.SV, true) s, sqldb, _ := serverutils.StartServer(t, base.TestServerArgs{ Settings: settings, }) diff --git a/pkg/sql/logictest/parallel_test.go b/pkg/sql/logictest/parallel_test.go index 0bb1ad8cfd3a..a4c9b8168c72 100644 --- a/pkg/sql/logictest/parallel_test.go +++ b/pkg/sql/logictest/parallel_test.go @@ -148,7 +148,7 @@ func (t *parallelTest) run(dir string) { log.Infof(t.ctx, "spec: %+v", spec) } - t.setup(&spec) + t.setup(context.Background(), &spec) defer t.close() for runListIdx, runList := range spec.Run { @@ -175,7 +175,7 @@ func (t *parallelTest) run(dir string) { } } -func (t *parallelTest) setup(spec *parTestSpec) { +func (t *parallelTest) setup(ctx context.Context, spec *parTestSpec) { if spec.ClusterSize == 0 { spec.ClusterSize = 1 } @@ -191,9 +191,9 @@ func (t *parallelTest) setup(spec *parTestSpec) { mode := sessiondata.DistSQLOff st := server.ClusterSettings() st.Manual.Store(true) - sql.DistSQLClusterExecMode.Override(&st.SV, int64(mode)) + sql.DistSQLClusterExecMode.Override(ctx, &st.SV, int64(mode)) // Disable automatic stats - they can interfere with the test shutdown. - stats.AutomaticStatisticsClusterMode.Override(&st.SV, false) + stats.AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) } t.clients = make([][]*gosql.DB, spec.ClusterSize) diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index ef5d85a62f7c..fb2d753e0b5c 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -291,7 +291,7 @@ func MakeServer( server.mu.Unlock() connAuthConf.SetOnChange(&st.SV, - func() { + func(ctx context.Context) { loadLocalAuthConfigUponRemoteSettingChange( ambientCtx.AnnotateCtx(context.Background()), server, st) }) diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index b50ef6223baf..baa9720e0528 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -246,9 +246,7 @@ func (n *setClusterSettingNode) startExec(params runParams) error { } if params.p.execCfg.TenantTestingKnobs != nil { - if err := params.p.execCfg.TenantTestingKnobs.ClusterSettingsUpdater.Set( - n.name, encoded, n.setting.Typ(), - ); err != nil { + if err := params.p.execCfg.TenantTestingKnobs.ClusterSettingsUpdater.Set(ctx, n.name, encoded, n.setting.Typ()); err != nil { return err } } diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index 9e7bc4d6adbc..32cc6a0dcccd 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -40,8 +40,8 @@ func TestSQLInstance(t *testing.T) { clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion, true /* initializeVersion */) - slinstance.DefaultTTL.Override(&settings.SV, 2*time.Microsecond) - slinstance.DefaultHeartBeat.Override(&settings.SV, time.Microsecond) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond) fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings) diff --git a/pkg/sql/sqlliveness/slstorage/slstorage_test.go b/pkg/sql/sqlliveness/slstorage/slstorage_test.go index 1bd2022f6459..b1a3cf86fce7 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage_test.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage_test.go @@ -102,7 +102,7 @@ func TestStorage(t *testing.T) { t.Run("delete-update", func(t *testing.T) { clock, timeSource, settings, stopper, storage := setup(t) defer stopper.Stop(ctx) - slstorage.GCJitter.Override(&settings.SV, 0) + slstorage.GCJitter.Override(ctx, &settings.SV, 0) storage.Start(ctx) metrics := storage.Metrics() @@ -320,7 +320,7 @@ func TestConcurrentAccessesAndEvictions(t *testing.T) { settings := cluster.MakeTestingClusterSettings() stopper := stop.NewStopper() defer stopper.Stop(ctx) - slstorage.CacheSize.Override(&settings.SV, 10) + slstorage.CacheSize.Override(ctx, &settings.SV, 10) storage := slstorage.NewTestingStorage(stopper, clock, kvDB, ie, settings, dbName, timeSource.NewTimer) storage.Start(ctx) diff --git a/pkg/sql/stats/automatic_stats_test.go b/pkg/sql/stats/automatic_stats_test.go index aa64e9b76eb4..eb84527961cd 100644 --- a/pkg/sql/stats/automatic_stats_test.go +++ b/pkg/sql/stats/automatic_stats_test.go @@ -48,8 +48,8 @@ func TestMaybeRefreshStats(t *testing.T) { evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(ctx) - AutomaticStatisticsClusterMode.Override(&st.SV, false) - AutomaticStatisticsMinStaleRows.Override(&st.SV, 5) + AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) + AutomaticStatisticsMinStaleRows.Override(ctx, &st.SV, 5) sqlRun := sqlutils.MakeSQLRunner(sqlDB) sqlRun.Exec(t, @@ -130,7 +130,7 @@ func TestAverageRefreshTime(t *testing.T) { evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(ctx) - AutomaticStatisticsClusterMode.Override(&st.SV, false) + AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) sqlRun := sqlutils.MakeSQLRunner(sqlDB) sqlRun.Exec(t, @@ -372,7 +372,7 @@ func TestAutoStatsReadOnlyTables(t *testing.T) { defer s.Stopper().Stop(ctx) st := cluster.MakeTestingClusterSettings() - AutomaticStatisticsClusterMode.Override(&st.SV, false) + AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(ctx) @@ -399,7 +399,7 @@ func TestAutoStatsReadOnlyTables(t *testing.T) { ) refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */) - AutomaticStatisticsClusterMode.Override(&st.SV, true) + AutomaticStatisticsClusterMode.Override(ctx, &st.SV, true) if err := refresher.Start( ctx, s.Stopper(), time.Millisecond, /* refreshInterval */ @@ -466,7 +466,7 @@ func TestMutationsChannel(t *testing.T) { evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(ctx) - AutomaticStatisticsClusterMode.Override(&st.SV, true) + AutomaticStatisticsClusterMode.Override(ctx, &st.SV, true) r := Refresher{ st: st, mutations: make(chan mutation, refreshChanBufferLen), @@ -492,7 +492,7 @@ func TestDefaultColumns(t *testing.T) { defer s.Stopper().Stop(ctx) st := cluster.MakeTestingClusterSettings() - AutomaticStatisticsClusterMode.Override(&st.SV, false) + AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(ctx) diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index 7ddac2020a3a..81e504ba9e8b 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -136,7 +136,7 @@ func (r *Registry) poll(ctx context.Context) { lastPoll = timeutil.Now() } ) - pollingInterval.SetOnChange(&r.st.SV, func() { + pollingInterval.SetOnChange(&r.st.SV, func(ctx context.Context) { select { case pollIntervalChanged <- struct{}{}: default: diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go index e22ee15caec1..381ae220597a 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go @@ -154,6 +154,7 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { // TestChangePollInterval ensures that changing the polling interval takes effect. func TestChangePollInterval(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() // We'll inject a request filter to detect scans due to the polling. tableStart := keys.SystemSQLCodec.TablePrefix(keys.StatementDiagnosticsRequestsTableID) @@ -190,7 +191,7 @@ func TestChangePollInterval(t *testing.T) { // Set an extremely long initial polling interval to not hit flakes due to // server startup taking more than 10s. - stmtdiagnostics.PollingInterval.Override(&settings.SV, time.Hour) + stmtdiagnostics.PollingInterval.Override(ctx, &settings.SV, time.Hour) args := base.TestServerArgs{ Settings: settings, Knobs: base.TestingKnobs{ @@ -211,7 +212,6 @@ func TestChangePollInterval(t *testing.T) { }, } s, db, _ := serverutils.StartServer(t, args) - ctx := context.Background() defer s.Stopper().Stop(ctx) require.Equal(t, 1, waitForScans(1)) diff --git a/pkg/sql/tests/monotonic_insert_test.go b/pkg/sql/tests/monotonic_insert_test.go index e3c769668991..a6337779e532 100644 --- a/pkg/sql/tests/monotonic_insert_test.go +++ b/pkg/sql/tests/monotonic_insert_test.go @@ -105,11 +105,11 @@ func testMonotonicInserts(t *testing.T, distSQLMode sessiondata.DistSQLExecMode) for _, server := range tc.Servers { st := server.ClusterSettings() st.Manual.Store(true) - sql.DistSQLClusterExecMode.Override(&st.SV, int64(distSQLMode)) + sql.DistSQLClusterExecMode.Override(ctx, &st.SV, int64(distSQLMode)) // Let transactions push immediately to detect deadlocks. The test creates a // large amount of contention and dependency cycles, and could take a long // time to complete without this. - concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0) + concurrency.LockTableDeadlockDetectionPushDelay.Override(ctx, &st.SV, 0) } var clients []mtClient diff --git a/pkg/sql/tests/repair_test.go b/pkg/sql/tests/repair_test.go index 6ceb5a67d841..14b54299cc4b 100644 --- a/pkg/sql/tests/repair_test.go +++ b/pkg/sql/tests/repair_test.go @@ -76,7 +76,7 @@ func TestDescriptorRepairOrphanedDescriptors(t *testing.T) { s, db, cleanup := setup(t) defer cleanup() - descs.ValidateOnWriteEnabled.Override(&s.ClusterSettings().SV, false) + descs.ValidateOnWriteEnabled.Override(ctx, &s.ClusterSettings().SV, false) require.NoError(t, crdb.ExecuteTx(ctx, db, nil, func(tx *gosql.Tx) error { if _, err := tx.Exec( "SELECT crdb_internal.unsafe_upsert_descriptor($1, decode($2, 'hex'));", @@ -87,7 +87,7 @@ func TestDescriptorRepairOrphanedDescriptors(t *testing.T) { parentID, schemaID, tableName, descID) return err })) - descs.ValidateOnWriteEnabled.Override(&s.ClusterSettings().SV, true) + descs.ValidateOnWriteEnabled.Override(ctx, &s.ClusterSettings().SV, true) // Ideally we should be able to query `crdb_internal.invalid_object` but it // does not do enough validation. Instead we'll just observe the issue that @@ -127,7 +127,7 @@ func TestDescriptorRepairOrphanedDescriptors(t *testing.T) { s, db, cleanup := setup(t) defer cleanup() - descs.ValidateOnWriteEnabled.Override(&s.ClusterSettings().SV, false) + descs.ValidateOnWriteEnabled.Override(ctx, &s.ClusterSettings().SV, false) require.NoError(t, crdb.ExecuteTx(ctx, db, nil, func(tx *gosql.Tx) error { if _, err := tx.Exec( "SELECT crdb_internal.unsafe_upsert_descriptor($1, decode($2, 'hex'));", @@ -138,7 +138,7 @@ func TestDescriptorRepairOrphanedDescriptors(t *testing.T) { parentID, schemaID, tableName, descID) return err })) - descs.ValidateOnWriteEnabled.Override(&s.ClusterSettings().SV, true) + descs.ValidateOnWriteEnabled.Override(ctx, &s.ClusterSettings().SV, true) // Ideally we should be able to query `crdb_internal.invalid_objects` but it // does not do enough validation. Instead we'll just observe the issue that @@ -435,11 +435,11 @@ SELECT crdb_internal.unsafe_delete_namespace_entry("parentID", 0, 'foo', id) now := s.Clock().Now().GoTime() defer cleanup() tdb := sqlutils.MakeSQLRunner(db) - descs.ValidateOnWriteEnabled.Override(&s.ClusterSettings().SV, false) + descs.ValidateOnWriteEnabled.Override(ctx, &s.ClusterSettings().SV, false) for _, op := range tc.before { tdb.Exec(t, op) } - descs.ValidateOnWriteEnabled.Override(&s.ClusterSettings().SV, true) + descs.ValidateOnWriteEnabled.Override(ctx, &s.ClusterSettings().SV, true) _, err := db.Exec(tc.op) if tc.expErrRE == "" { require.NoError(t, err) diff --git a/pkg/sql/tests/truncate_test.go b/pkg/sql/tests/truncate_test.go index cb52461b437b..e374f0399bc7 100644 --- a/pkg/sql/tests/truncate_test.go +++ b/pkg/sql/tests/truncate_test.go @@ -67,7 +67,7 @@ func TestTruncateWithConcurrentMutations(t *testing.T) { ) { settings := cluster.MakeTestingClusterSettings() - stats.AutomaticStatisticsClusterMode.Override(&settings.SV, false) + stats.AutomaticStatisticsClusterMode.Override(ctx, &settings.SV, false) scKnobs := &sql.SchemaChangerTestingKnobs{} blockFunc := func(jobID jobspb.JobID) error { select { diff --git a/pkg/storage/cloud/httpsink/http_storage_test.go b/pkg/storage/cloud/httpsink/http_storage_test.go index 912f7c06fce4..cef338eb1afa 100644 --- a/pkg/storage/cloud/httpsink/http_storage_test.go +++ b/pkg/storage/cloud/httpsink/http_storage_test.go @@ -52,6 +52,7 @@ func TestPutHttp(t *testing.T) { const badHeadResponse = "bad-head-response" user := security.RootUserName() + ctx := context.Background() makeServer := func() (*url.URL, func() int, func()) { var files int @@ -89,17 +90,13 @@ func TestPutHttp(t *testing.T) { })) u := testSettings.MakeUpdater() - if err := u.Set( - "cloudstorage.http.custom_ca", - string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw})), - "s", - ); err != nil { + if err := u.Set(ctx, "cloudstorage.http.custom_ca", string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw})), "s"); err != nil { t.Fatal(err) } cleanup := func() { srv.Close() - if err := u.Set("cloudstorage.http.custom_ca", "", "s"); err != nil { + if err := u.Set(ctx, "cloudstorage.http.custom_ca", "", "s"); err != nil { t.Fatal(err) } } diff --git a/pkg/storage/in_mem.go b/pkg/storage/in_mem.go index 6c34ca120ba4..aa04e246036c 100644 --- a/pkg/storage/in_mem.go +++ b/pkg/storage/in_mem.go @@ -76,6 +76,6 @@ func makeSettingsForSeparatedIntents(oldClusterVersion bool, enabled bool) *clus version = clusterversion.ByKey(clusterversion.V20_2) } settings := cluster.MakeTestingClusterSettingsWithVersions(version, version, true) - SeparatedIntentsEnabled.Override(&settings.SV, enabled) + SeparatedIntentsEnabled.Override(context.TODO(), &settings.SV, enabled) return settings } diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 4ce659c6434f..18d7c5de3601 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -421,9 +421,10 @@ func TestPebbleSeparatorSuccessor(t *testing.T) { func TestPebbleDiskSlowEmit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() settings := cluster.MakeTestingClusterSettings() - MaxSyncDurationFatalOnExceeded.Override(&settings.SV, false) + MaxSyncDurationFatalOnExceeded.Override(ctx, &settings.SV, false) p := newPebbleInMem( context.Background(), roachpb.Attributes{}, diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 8b68b3ecce00..d347291ccfb4 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -179,7 +179,8 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto Settings: cfg.Settings, HistogramWindowInterval: cfg.HistogramWindowInterval, }) - kvserver.TimeUntilStoreDead.Override(&cfg.Settings.SV, kvserver.TestTimeUntilStoreDead) + ctx := context.TODO() + kvserver.TimeUntilStoreDead.Override(ctx, &cfg.Settings.SV, kvserver.TestTimeUntilStoreDead) cfg.StorePool = kvserver.NewStorePool( cfg.AmbientCtx, cfg.Settings, @@ -191,7 +192,6 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto ) cfg.Transport = transport cfg.ClosedTimestampReceiver = sidetransport.NewReceiver(nc, ltc.stopper, ltc.Stores, nil /* testingKnobs */) - ctx := context.TODO() if err := kvserver.WriteClusterVersion(ctx, ltc.Eng, clusterversion.TestingClusterVersion); err != nil { t.Fatalf("unable to write cluster version: %s", err) diff --git a/pkg/ts/db_test.go b/pkg/ts/db_test.go index 8ef8a5d6ea21..21a23e05304c 100644 --- a/pkg/ts/db_test.go +++ b/pkg/ts/db_test.go @@ -792,8 +792,9 @@ func TestPollSource(t *testing.T) { // setting works properly. func TestDisableStorage(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() runTestCaseMultipleFormats(t, func(t *testing.T, tm testModelRunner) { - TimeseriesStorageEnabled.Override(&tm.Cfg.Settings.SV, false) + TimeseriesStorageEnabled.Override(ctx, &tm.Cfg.Settings.SV, false) // Basic storage operation: one data point. tm.storeTimeSeriesData(Resolution10s, []tspb.TimeSeriesData{ diff --git a/pkg/util/log/logcrash/crash_reporting_packet_test.go b/pkg/util/log/logcrash/crash_reporting_packet_test.go index cd567ccf4fd7..6ca260333142 100644 --- a/pkg/util/log/logcrash/crash_reporting_packet_test.go +++ b/pkg/util/log/logcrash/crash_reporting_packet_test.go @@ -62,7 +62,7 @@ func TestCrashReportingPacket(t *testing.T) { st := cluster.MakeTestingClusterSettings() // Enable all crash-reporting settings. - logcrash.DiagnosticsReportingEnabled.Override(&st.SV, true) + logcrash.DiagnosticsReportingEnabled.Override(ctx, &st.SV, true) defer logcrash.TestingSetCrashReportingURL("https://ignored:ignored@ignored/1234")() @@ -210,7 +210,7 @@ func TestInternalErrorReporting(t *testing.T) { st := cluster.MakeTestingClusterSettings() // Enable all crash-reporting settings. - logcrash.DiagnosticsReportingEnabled.Override(&st.SV, true) + logcrash.DiagnosticsReportingEnabled.Override(ctx, &st.SV, true) defer logcrash.TestingSetCrashReportingURL("https://ignored:ignored@ignored/1234")() diff --git a/pkg/util/log/logcrash/main_test.go b/pkg/util/log/logcrash/main_test.go index c680db5abb7a..08c7c0102d4a 100644 --- a/pkg/util/log/logcrash/main_test.go +++ b/pkg/util/log/logcrash/main_test.go @@ -11,6 +11,7 @@ package logcrash_test import ( + "context" "os" "testing" @@ -27,13 +28,14 @@ func TestMain(m *testing.M) { randutil.SeedForTests() security.SetAssetLoader(securitytest.EmbeddedAssets) serverutils.InitTestServerFactory(server.TestServerFactory) + ctx := context.Background() // MakeTestingClusterSettings initializes log.ReportingSettings to this // instance of setting values. // TODO(knz): This comment appears to be untrue. st := cluster.MakeTestingClusterSettings() - logcrash.DiagnosticsReportingEnabled.Override(&st.SV, false) - logcrash.CrashReports.Override(&st.SV, false) + logcrash.DiagnosticsReportingEnabled.Override(ctx, &st.SV, false) + logcrash.CrashReports.Override(ctx, &st.SV, false) os.Exit(m.Run()) } diff --git a/pkg/util/log/trace_client_test.go b/pkg/util/log/trace_client_test.go index 5c35bf23c74a..c862101b1c4f 100644 --- a/pkg/util/log/trace_client_test.go +++ b/pkg/util/log/trace_client_test.go @@ -22,7 +22,6 @@ import ( ) func TestTrace(t *testing.T) { - for _, tc := range []struct { name string init func(context.Context) (context.Context, *tracing.Span) @@ -55,8 +54,8 @@ func TestTrace(t *testing.T) { init: func(ctx context.Context) (context.Context, *tracing.Span) { tr := tracing.NewTracer() st := cluster.MakeTestingClusterSettings() - tracing.ZipkinCollector.Override(&st.SV, "127.0.0.1:9000000") - tr.Configure(&st.SV) + tracing.ZipkinCollector.Override(ctx, &st.SV, "127.0.0.1:9000000") + tr.Configure(ctx, &st.SV) return tr.StartSpanCtx(context.Background(), "foo") }, check: func(t *testing.T, ctx context.Context, sp *tracing.Span) { diff --git a/pkg/util/tracing/alloc_test.go b/pkg/util/tracing/alloc_test.go index 9eeda06d9314..ca10a6cb0ce1 100644 --- a/pkg/util/tracing/alloc_test.go +++ b/pkg/util/tracing/alloc_test.go @@ -29,7 +29,7 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { tr := NewTracer() sv := settings.Values{} - tr.Configure(&sv) + tr.Configure(ctx, &sv) staticLogTags := logtags.Buffer{} staticLogTags.Add("foo", "bar") @@ -71,9 +71,10 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { // BenchmarkSpan_GetRecording microbenchmarks GetRecording. func BenchmarkSpan_GetRecording(b *testing.B) { + ctx := context.Background() var sv settings.Values tr := NewTracer() - tr.Configure(&sv) + tr.Configure(ctx, &sv) sp := tr.StartSpan("foo", WithForceRealSpan()) diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index e339ae630dae..7b49758de95c 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -181,8 +181,8 @@ func NewTracer() *Tracer { // Configure sets up the Tracer according to the cluster settings (and keeps // it updated if they change). -func (t *Tracer) Configure(sv *settings.Values) { - reconfigure := func() { +func (t *Tracer) Configure(ctx context.Context, sv *settings.Values) { + reconfigure := func(ctx context.Context) { if lsToken := lightstepToken.Get(sv); lsToken != "" { t.setShadowTracer(createLightStepTracer(lsToken)) } else if zipkinAddr := ZipkinCollector.Get(sv); zipkinAddr != "" { @@ -199,7 +199,7 @@ func (t *Tracer) Configure(sv *settings.Values) { atomic.StoreInt32(&t._useNetTrace, nt) } - reconfigure() + reconfigure(ctx) enableNetTrace.SetOnChange(sv, reconfigure) lightstepToken.SetOnChange(sv, reconfigure)