Skip to content

Commit

Permalink
settings: plumb ctx to settings change callbacks
Browse files Browse the repository at this point in the history
And from those callbacks, to various settings setters.
Callbacks to widely-used libs like settings should always take contexts.

Release note: None
  • Loading branch information
andreimatei committed May 21, 2021
1 parent 41756ef commit 9048076
Show file tree
Hide file tree
Showing 99 changed files with 422 additions and 381 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (d *datadrivenTestState) addServer(
return errors.New("unable to parse tempCleanupFrequency during server creation")
}
settings := cluster.MakeTestingClusterSettings()
sql.TempObjectCleanupInterval.Override(&settings.SV, duration)
sql.TempObjectCleanupInterval.Override(context.Background(), &settings.SV, duration)
params.ServerArgs.Settings = settings
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (

func TestMaxImportBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

testCases := []struct {
importBatchSize int64
Expand All @@ -61,8 +62,8 @@ func TestMaxImportBatchSize(t *testing.T) {
}
for i, testCase := range testCases {
st := cluster.MakeTestingClusterSettings()
storageccl.ImportBatchSize.Override(&st.SV, testCase.importBatchSize)
kvserver.MaxCommandSize.Override(&st.SV, testCase.maxCommandSize)
storageccl.ImportBatchSize.Override(ctx, &st.SV, testCase.importBatchSize)
kvserver.MaxCommandSize.Override(ctx, &st.SV, testCase.maxCommandSize)
if e, a := storageccl.MaxImportBatchSize(st), testCase.expected; e != a {
t.Errorf("%d: expected max batch size %d, but got %d", i, e, a)
}
Expand Down Expand Up @@ -163,14 +164,15 @@ func clientKVsToEngineKVs(kvs []kv.KeyValue) []storage.MVCCKeyValue {

func TestImport(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
t.Run("batch=default", func(t *testing.T) {
runTestImport(t, func(_ *cluster.Settings) {})
})
t.Run("batch=1", func(t *testing.T) {
// The test normally doesn't trigger the batching behavior, so lower
// the threshold to force it.
init := func(st *cluster.Settings) {
storageccl.ImportBatchSize.Override(&st.SV, 1)
storageccl.ImportBatchSize.Override(ctx, &st.SV, 1)
}
runTestImport(t, init)
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3186,6 +3186,7 @@ func TestChangefeedTelemetry(t *testing.T) {
func TestChangefeedMemBufferCapacityErrorRetryable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// memLimitTest returns a test runner which starts numFeeds changefeeds,
// and verifies that memory limits are honored.
Expand Down Expand Up @@ -3230,7 +3231,7 @@ func TestChangefeedMemBufferCapacityErrorRetryable(t *testing.T) {
// Each changefeed gets enough memory to work by itself, but not enough
// to have all the changefeeds succeed.
changefeedbase.PerChangefeedMemLimit.Override(
&ff.Server().ClusterSettings().SV, 2*mon.DefaultPoolAllocationSize)
ctx, &ff.Server().ClusterSettings().SV, 2*mon.DefaultPoolAllocationSize)

// beforeEmitRowCh is used to block feeds from processing messages.
// This channel is closed below to speed up test termination.
Expand Down
13 changes: 8 additions & 5 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func TestEvalFollowerReadOffset(t *testing.T) {
func TestZeroDurationDisablesFollowerReadOffset(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()
ctx := context.Background()

st := cluster.MakeTestingClusterSettings()
closedts.TargetDuration.Override(&st.SV, 0)
closedts.TargetDuration.Override(ctx, &st.SV, 0)
if offset, err := evalFollowerReadOffset(uuid.MakeV4(), st); err != nil {
t.Fatal(err)
} else if offset != math.MinInt64 {
Expand All @@ -84,6 +85,7 @@ func TestZeroDurationDisablesFollowerReadOffset(t *testing.T) {

func TestCanSendToFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset)
stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0)
current := clock.Now()
Expand Down Expand Up @@ -273,9 +275,9 @@ func TestCanSendToFollower(t *testing.T) {
defer utilccl.TestingEnableEnterprise()()
}
st := cluster.MakeTestingClusterSettings()
kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads)
kvserver.FollowerReadsEnabled.Override(ctx, &st.SV, !c.disabledFollowerReads)
if c.zeroTargetDuration {
closedts.TargetDuration.Override(&st.SV, 0)
closedts.TargetDuration.Override(ctx, &st.SV, 0)
}

can := canSendToFollower(uuid.MakeV4(), st, clock, c.ctPolicy, c.ba)
Expand All @@ -286,13 +288,14 @@ func TestCanSendToFollower(t *testing.T) {

func TestFollowerReadMultipleValidation(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
defer func() {
if r := recover(); r == nil {
t.Fatalf("expected panic from setting followerReadMultiple to .1")
}
}()
st := cluster.MakeTestingClusterSettings()
followerReadMultiple.Override(&st.SV, .1)
followerReadMultiple.Override(ctx, &st.SV, .1)
}

// mockNodeStore implements the kvcoord.NodeDescStore interface.
Expand Down Expand Up @@ -470,7 +473,7 @@ func TestOracle(t *testing.T) {
defer utilccl.TestingEnableEnterprise()()
}
st := cluster.MakeTestingClusterSettings()
kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads)
kvserver.FollowerReadsEnabled.Override(ctx, &st.SV, !c.disabledFollowerReads)

o := replicaoracle.NewOracle(followerReadOraclePolicy, replicaoracle.Config{
NodeDescs: nodes,
Expand Down
40 changes: 20 additions & 20 deletions pkg/ccl/oidcccl/authentication_oidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,35 +410,35 @@ var ConfigureOIDC = func(

reloadConfig(serverCtx, oidcAuthentication, locality, st)

OIDCEnabled.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCEnabled.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCClientID.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCClientID.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCClientSecret.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCClientSecret.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCRedirectURL.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCRedirectURL.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCProviderURL.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCProviderURL.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCScopes.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCScopes.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCClaimJSONKey.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCClaimJSONKey.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCPrincipalRegex.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCPrincipalRegex.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCButtonText.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCButtonText.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})
OIDCAutoLogin.SetOnChange(&st.SV, func() {
reloadConfig(ambientCtx.AnnotateCtx(context.Background()), oidcAuthentication, locality, st)
OIDCAutoLogin.SetOnChange(&st.SV, func(ctx context.Context) {
reloadConfig(ambientCtx.AnnotateCtx(ctx), oidcAuthentication, locality, st)
})

return oidcAuthentication, nil
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/utilccl/license_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestSettingAndCheckingLicense(t *testing.T) {
idA, _ := uuid.FromString("A0000000-0000-0000-0000-00000000000A")
idB, _ := uuid.FromString("B0000000-0000-0000-0000-00000000000B")

ctx := context.Background()
t0 := timeutil.Unix(0, 0)

licA, _ := (&licenseccl.License{
Expand Down Expand Up @@ -60,7 +61,7 @@ func TestSettingAndCheckingLicense(t *testing.T) {
{"", idA, t0, "requires an enterprise license"},
} {
updater := st.MakeUpdater()
if err := updater.Set("enterprise.license", tc.lic, "s"); err != nil {
if err := updater.Set(ctx, "enterprise.license", tc.lic, "s"); err != nil {
t.Fatal(err)
}
err := checkEnterpriseEnabledAt(st, tc.checkTime, tc.checkCluster, "", "", true)
Expand All @@ -72,6 +73,7 @@ func TestSettingAndCheckingLicense(t *testing.T) {
}

func TestGetLicenseTypePresent(t *testing.T) {
ctx := context.Background()
for _, tc := range []struct {
licenseType licenseccl.License_Type
expected string
Expand All @@ -87,7 +89,7 @@ func TestGetLicenseTypePresent(t *testing.T) {
Type: tc.licenseType,
ValidUntilUnixSec: 0,
}).Encode()
if err := updater.Set("enterprise.license", lic, "s"); err != nil {
if err := updater.Set(ctx, "enterprise.license", lic, "s"); err != nil {
t.Fatal(err)
}
actual, err := getLicenseType(st)
Expand All @@ -112,14 +114,15 @@ func TestGetLicenseTypeAbsent(t *testing.T) {
}

func TestSettingBadLicenseStrings(t *testing.T) {
ctx := context.Background()
for _, tc := range []struct{ lic, err string }{
{"blah", "invalid license string"},
{"cl-0-blah", "invalid license string"},
} {
st := cluster.MakeTestingClusterSettings()
u := st.MakeUpdater()

if err := u.Set("enterprise.license", tc.lic, "s"); !testutils.IsError(
if err := u.Set(ctx, "enterprise.license", tc.lic, "s"); !testutils.IsError(
err, tc.err,
) {
t.Fatalf("%q: expected err %q, got %v", tc.lic, tc.err, err)
Expand All @@ -128,6 +131,7 @@ func TestSettingBadLicenseStrings(t *testing.T) {
}

func TestTimeToEnterpriseLicenseExpiry(t *testing.T) {
ctx := context.Background()
id, _ := uuid.FromString("A0000000-0000-0000-0000-00000000000A")

t0 := timeutil.Unix(1603926294, 0)
Expand Down Expand Up @@ -171,7 +175,7 @@ func TestTimeToEnterpriseLicenseExpiry(t *testing.T) {
{"No License", "", 0},
} {
t.Run(tc.desc, func(t *testing.T) {
if err := updater.Set("enterprise.license", tc.lic, "s"); err != nil {
if err := updater.Set(ctx, "enterprise.license", tc.lic, "s"); err != nil {
t.Fatal(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/connect_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestNodeJoin(t *testing.T) {
defer cancel()

settings := cluster.MakeTestingClusterSettings()
sql.FeatureTLSAutoJoinEnabled.Override(&settings.SV, true)
sql.FeatureTLSAutoJoinEnabled.Override(ctx, &settings.SV, true)
s, sqldb, _ := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestNodeJoinBadToken(t *testing.T) {
defer cancel()

settings := cluster.MakeTestingClusterSettings()
sql.FeatureTLSAutoJoinEnabled.Override(&settings.SV, true)
sql.FeatureTLSAutoJoinEnabled.Override(ctx, &settings.SV, true)
s, sqldb, _ := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package cli

import (
"context"
"crypto/rand"
"fmt"
"io"
Expand Down Expand Up @@ -205,7 +206,7 @@ Output the list of cluster settings known to this binary.

// Fill a Values struct with the defaults.
s := cluster.MakeTestingClusterSettings()
settings.NewUpdater(&s.SV).ResetRemaining()
settings.NewUpdater(&s.SV).ResetRemaining(context.Background())

var rows [][]string
for _, name := range settings.Keys() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestPartialZip(t *testing.T) {
// is no risk to see the override bumped due to a gossip update
// because this setting is not otherwise set in the test cluster.
s := tc.Server(0)
kvserver.TimeUntilStoreDead.Override(&s.ClusterSettings().SV, kvserver.TestTimeUntilStoreDead)
kvserver.TimeUntilStoreDead.Override(ctx, &s.ClusterSettings().SV, kvserver.TestTimeUntilStoreDead)

// This last case may take a little while to converge. To make this work with datadriven and at the same
// time retain the ability to use the `-rewrite` flag, we use a retry loop within that already checks the
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusterversion/clusterversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (v *handleImpl) SetActiveVersion(ctx context.Context, cv ClusterVersion) er
return err
}

version.SetInternal(v.sv, encoded)
version.SetInternal(ctx, v.sv, encoded)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/clusterversion/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (cv *clusterVersionSetting) initialize(
if err != nil {
return err
}
cv.SetInternal(sv, encoded)
cv.SetInternal(ctx, sv, encoded)
return nil
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,30 +202,31 @@ func TestJobSchedulerDaemonInitialScanDelay(t *testing.T) {

func getScopedSettings() (*settings.Values, func()) {
sv := &settings.Values{}
sv.Init(nil)
sv.Init(context.Background(), nil)
return sv, settings.TestingSaveRegistry()
}

func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

sv, cleanup := getScopedSettings()
defer cleanup()

schedulerEnabledSetting.Override(sv, false)
schedulerEnabledSetting.Override(ctx, sv, false)

// When disabled, we wait 5 minutes before rechecking.
require.EqualValues(t, 5*time.Minute, getWaitPeriod(sv, nil))
schedulerEnabledSetting.Override(sv, true)
schedulerEnabledSetting.Override(ctx, sv, true)

// When pace is too low, we use something more reasonable.
schedulerPaceSetting.Override(sv, time.Nanosecond)
schedulerPaceSetting.Override(ctx, sv, time.Nanosecond)
require.EqualValues(t, minPacePeriod, getWaitPeriod(sv, nil))

// Otherwise, we use user specified setting.
pace := 42 * time.Second
schedulerPaceSetting.Override(sv, pace)
schedulerPaceSetting.Override(ctx, sv, pace)
require.EqualValues(t, pace, getWaitPeriod(sv, nil))
}

Expand Down Expand Up @@ -288,7 +289,7 @@ func TestJobSchedulerCanBeDisabledWhileSleeping(t *testing.T) {

knobs := fastDaemonKnobs(func() time.Duration {
// Disable daemon
schedulerEnabledSetting.Override(&h.cfg.Settings.SV, false)
schedulerEnabledSetting.Override(ctx, &h.cfg.Settings.SV, false)

// Before we return, create a job which should not be executed
// (since the daemon is disabled). We use our special executor
Expand Down Expand Up @@ -422,7 +423,7 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) {
// Advance our fake time 1 hour forward (plus a bit) so that the daemon finds matching jobs.
h.env.AdvanceTime(time.Hour + time.Second)
const jobsPerIteration = 2
schedulerMaxJobsPerIterationSetting.Override(&h.cfg.Settings.SV, jobsPerIteration)
schedulerMaxJobsPerIterationSetting.Override(ctx, &h.cfg.Settings.SV, jobsPerIteration)

// Make daemon execute initial scan immediately, but block subsequent scans.
h.cfg.TestingKnobs = fastDaemonKnobs(overridePaceSetting(time.Hour))
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ UPDATE system.jobs
if err := stopper.RunAsyncTask(context.Background(), "jobs/gc", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
settingChanged := make(chan struct{}, 1)
gcSetting.SetOnChange(&r.settings.SV, func() {
gcSetting.SetOnChange(&r.settings.SV, func(ctx context.Context) {
select {
case settingChanged <- struct{}{}:
default:
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ func TestRegistryResumeExpiredLease(t *testing.T) {
defer s.Stopper().Stop(ctx)

// Disable leniency for instant expiration
jobs.LeniencySetting.Override(&s.ClusterSettings().SV, 0)
jobs.LeniencySetting.Override(ctx, &s.ClusterSettings().SV, 0)
const cancelInterval = time.Duration(math.MaxInt64)
const adoptInterval = time.Microsecond
slinstance.DefaultTTL.Override(&s.ClusterSettings().SV, 2*adoptInterval)
slinstance.DefaultHeartBeat.Override(&s.ClusterSettings().SV, adoptInterval)
slinstance.DefaultTTL.Override(ctx, &s.ClusterSettings().SV, 2*adoptInterval)
slinstance.DefaultHeartBeat.Override(ctx, &s.ClusterSettings().SV, adoptInterval)

db := s.DB()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.clusterID = &cfg.RPCContext.ClusterID
ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency",
uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV)))
senderConcurrencyLimit.SetOnChange(&cfg.Settings.SV, func() {
senderConcurrencyLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV)))
})
ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))
Expand Down
Loading

0 comments on commit 9048076

Please sign in to comment.