Skip to content

Commit

Permalink
settingswatcher: call a function upon updates to TenantReadOnly settings
Browse files Browse the repository at this point in the history
In a later commit, we will want to feed changes to TenantReadOnly
settings in the system tenant's `system.settings` table into the
connector, for use by the virtual cluster servers.

This commit sets up the first building block: an extra callback
function in the settings watcher that gets called when the watcher
detects a change to TenantReadOnly settings.

Release note: None
  • Loading branch information
knz committed Sep 29, 2023
1 parent c5af624 commit d632d44
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 7 deletions.
122 changes: 115 additions & 7 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package settingswatcher

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -69,6 +70,16 @@ type SettingsWatcher struct {
updateWait chan struct{}
}

// notifyTenantReadOnlyChange is called when one or more
// TenantReadOnly setting changes. It is only set when the
// SettingsWatcher is created with NewWithNotifier. It is used by
// the tenant setting override watcher to pick up defaults set via
// system.settings in the system tenant.
//
// The callee function can assume that the slice in the second
// argument is sorted by InternalKey.
notifyTenantReadOnlyChange func(context.Context, []kvpb.TenantSetting)

// testingWatcherKnobs allows the client to inject testing knobs into
// the underlying rangefeedcache.Watcher.
testingWatcherKnobs *rangefeedcache.TestingKnobs
Expand Down Expand Up @@ -105,6 +116,23 @@ func New(
return s
}

// NewWithNotifier constructs a new SettingsWatcher which notifies
// an observer about changes to TenantReadOnly settings.
func NewWithNotifier(
ctx context.Context,
clock *hlc.Clock,
codec keys.SQLCodec,
settingsToUpdate *cluster.Settings,
f *rangefeed.Factory,
stopper *stop.Stopper,
notify func(context.Context, []kvpb.TenantSetting),
storage Storage, // optional
) *SettingsWatcher {
w := New(clock, codec, settingsToUpdate, f, stopper, storage)
w.notifyTenantReadOnlyChange = notify
return w
}

// NewWithOverrides constructs a new SettingsWatcher which allows external
// overrides, discovered through an OverridesMonitor.
func NewWithOverrides(
Expand All @@ -126,6 +154,14 @@ func NewWithOverrides(
// have been retrieved. An error will be returned if the context is canceled or
// the stopper is stopped prior to the initial data being retrieved.
func (s *SettingsWatcher) Start(ctx context.Context) error {
// Ensure we inform the read-only default notify callback function
// of the build-time defaults for TenantReadOnly settings.
//
// Note: we cannot call this in the New() function above because
// this can only be called after the in-RAM values have been loaded
// from disk, which happens some time during server PreStart().
s.loadInitialReadOnlyDefaults(ctx)

settingsTablePrefix := s.codec.TablePrefix(keys.SettingsTableID)
settingsTableSpan := roachpb.Span{
Key: settingsTablePrefix,
Expand Down Expand Up @@ -242,6 +278,35 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
}
}

func (s *SettingsWatcher) loadInitialReadOnlyDefaults(ctx context.Context) {
if s.notifyTenantReadOnlyChange == nil {
return
}

// When there is no explicit value in system.settings for a TenantReadOnly
// setting, we still want to propagate the system tenant's idea
// of the default value as an override to secondary tenants.
//
// This is because the secondary tenant may be using another version
// of the executable, where there is another default value for the
// setting. We want to make sure that the secondary tenant's idea of
// the default value is the same as the system tenant's.

tenantReadOnlyKeys := settings.TenantReadOnlyKeys()
payloads := make([]kvpb.TenantSetting, 0, len(tenantReadOnlyKeys))
for _, key := range tenantReadOnlyKeys {
knownSetting, payload := s.getSettingAndValue(key)
if !knownSetting {
panic(errors.AssertionFailedf("programming error: unknown setting %s", key))
}
payloads = append(payloads, payload)
}
// Make sure the payloads are sorted, as this is required by the
// notify API.
sort.Slice(payloads, func(i, j int) bool { return payloads[i].InternalKey < payloads[j].InternalKey })
s.notifyTenantReadOnlyChange(ctx, payloads)
}

// TestingRestart restarts the rangefeeds and waits for the initial
// update after the rangefeed update to be processed.
func (s *SettingsWatcher) TestingRestart() {
Expand Down Expand Up @@ -272,18 +337,20 @@ func (s *SettingsWatcher) handleKV(
}
settingKey := settings.InternalKey(settingKeyS)

setting, ok := settings.LookupForLocalAccessByKey(settingKey, s.codec.ForSystemTenant())
if !ok {
log.Warningf(ctx, "unknown setting %s, skipping update", settingKey)
return nil
}
if !s.codec.ForSystemTenant() {
setting, ok := settings.LookupForLocalAccessByKey(settingKey, s.codec.ForSystemTenant())
if !ok {
log.Warningf(ctx, "unknown setting %s, skipping update", settingKey)
return nil
}
if setting.Class() != settings.TenantWritable {
log.Warningf(ctx, "ignoring read-only setting %s", settingKey)
return nil
}
}

log.VEventf(ctx, 1, "found rangefeed event for %q = %+v (tombstone=%v)", settingKey, val, tombstone)

// Ensure that the update is persisted to the local cache before we
// propagate the value to the in-RAM store. This ensures the latest
// value will be reloaded from the cache if the service is
Expand All @@ -301,7 +368,7 @@ func (s *SettingsWatcher) handleKV(
val: val,
ts: kv.Value.Timestamp,
tombstone: tombstone,
})
}, setting.Class())
if s.storage != nil {
return kv
}
Expand All @@ -311,7 +378,7 @@ func (s *SettingsWatcher) handleKV(
// maybeSet will update the stored value and the corresponding setting
// in response to a kv event, assuming that event is new.
func (s *SettingsWatcher) maybeSet(
ctx context.Context, key settings.InternalKey, sv settingsValue,
ctx context.Context, key settings.InternalKey, sv settingsValue, class settings.Class,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -336,6 +403,12 @@ func (s *SettingsWatcher) maybeSet(
s.setLocked(ctx, key, sv.val, settings.OriginExplicitlySet)
}
}

if class == settings.TenantReadOnly {
// Notify the tenant settings watcher there is a new fallback
// default for this setting.
s.setTenantReadOnlyDefault(ctx, key)
}
}

// settingValue tracks an observed value from the rangefeed. By tracking the
Expand Down Expand Up @@ -434,6 +507,7 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) (updateCh <-chan
}
// A new override was added or an existing override has changed.
s.mu.overrides[key] = val
log.VEventf(ctx, 2, "applying override for %s = %q", key, val.Value)
s.setLocked(ctx, key, val, settings.OriginExternallySet)
}

Expand Down Expand Up @@ -527,3 +601,37 @@ func (s *SettingsWatcher) GetClusterVersionFromStorage(
func (s *SettingsWatcher) GetTenantClusterVersion() clusterversion.Handle {
return s.settings.Version
}

// setTenantReadOnlyDefault is called by the watcher above for any
// changes to system.settings made on a setting with class
// TenantReadOnly.
func (s *SettingsWatcher) setTenantReadOnlyDefault(ctx context.Context, key settings.InternalKey) {
if s.notifyTenantReadOnlyChange == nil {
return
}

found, payload := s.getSettingAndValue(key)
if !found {
// We are observing an update for a setting that does not exist
// (any more). This can happen if there was a customization in the
// system.settings table from a previous version and the setting
// was retired.
return
}

log.VEventf(ctx, 1, "propagating read-only default %+v", payload)

s.notifyTenantReadOnlyChange(ctx, []kvpb.TenantSetting{payload})
}

func (s *SettingsWatcher) getSettingAndValue(key settings.InternalKey) (bool, kvpb.TenantSetting) {
setting, ok := settings.LookupForLocalAccessByKey(key, settings.ForSystemTenant)
if !ok {
return false, kvpb.TenantSetting{}
}
payload := kvpb.TenantSetting{InternalKey: key, Value: settings.EncodedValue{
Value: setting.Encoded(&s.settings.SV),
Type: setting.Typ(),
}}
return true, payload
}
101 changes: 101 additions & 0 deletions pkg/server/settingswatcher/settings_watcher_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,104 @@ func TestStaleRowsDoNotCauseSettingsToRegress(t *testing.T) {
require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1)))
settingStillHasValueAfterAShortWhile(t, defaultFakeSettingValue)
}

var _ = settings.RegisterStringSetting(settings.TenantReadOnly, "str.baz", "desc", "initial")
var _ = settings.RegisterStringSetting(settings.SystemOnly, "str.yay", "desc", "")

// TestNotifyCalledUponReadOnlySettingChanges verifies that the notify
// function callback is called when a TenantReadOnly setting is
// updated in system.settings.
func TestNotifyCalledUponReadOnlySettingChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
})
defer s.Stopper().Stop(ctx)

sysDB := sqlutils.MakeSQLRunner(s.SystemLayer().SQLConn(t, ""))

ts := s.ApplicationLayer()
st := ts.ClusterSettings()
stopper := ts.AppStopper()

mu := struct {
syncutil.Mutex
updated []kvpb.TenantSetting
}{}
reset := func() {
mu.Lock()
defer mu.Unlock()
mu.updated = nil
}
contains := func(key settings.InternalKey) (bool, string) {
mu.Lock()
defer mu.Unlock()
for _, s := range mu.updated {
if s.InternalKey == key {
return true, s.Value.Value
}
}
return false, ""
}

notify := func(_ context.Context, updated []kvpb.TenantSetting) {
mu.Lock()
defer mu.Unlock()
mu.updated = append(mu.updated, updated...)
}

f, err := rangefeed.NewFactory(stopper, kvDB, st, &rangefeed.TestingKnobs{})
require.NoError(t, err)
w := settingswatcher.NewWithNotifier(ctx, ts.Clock(), ts.Codec(), st, f, stopper, notify, nil)
require.NoError(t, w.Start(ctx))

t.Run("initial scan", func(t *testing.T) {
// The notifier is called at least once for all the
// pre-existing TenantReadOnly settings.
testutils.SucceedsSoon(t, func() error {
for _, k := range settings.TenantReadOnlyKeys() {
seen, v := contains(k)
if !seen {
return errors.Newf("%s not seen yet", k)
}
if k == "str.baz" {
require.Equal(t, "initial", v)
}
}
return nil
})
})

t.Run("update", func(t *testing.T) {
reset()

// Update a setting using SQL and verify the notifier is called for
// it eventually. Also verify that changes to other settings are
// not notified.
sysDB.Exec(t, "SET CLUSTER SETTING str.yay = 'newval'")
sysDB.Exec(t, "SET CLUSTER SETTING str.foo = 'newval'")
sysDB.Exec(t, "SET CLUSTER SETTING str.baz = 'newval'")
testutils.SucceedsSoon(t, func() error {
seen, v := contains("str.baz")
if !seen {
return errors.New("not seen yet")
}
require.Equal(t, "newval", v)

// The rangefeed event for str.baz was delivered after those for
// str.foo and str.yay. If we had incorrectly notified an update
// for non-TenantReadOnly setting, they would show up in the
// updated list.
mu.Lock()
defer mu.Unlock()
if len(mu.updated) != 1 {
t.Errorf("expected 1 setting update, got: %+v", mu.updated)
}
return nil
})
})
}
18 changes: 18 additions & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
// read concurrently by different callers.
var registry = make(map[InternalKey]internalSetting)

// tenantReadOnlyKeys contains the keys of settings that have the
// class TenantReadOnly. This is used to initialize defaults in the
// tenant settings watcher.
var tenantReadOnlyKeys []InternalKey

// aliasRegistry contains the mapping of names to keys, for names
// different from the keys.
var aliasRegistry = make(map[SettingName]aliasEntry)
Expand All @@ -52,9 +57,12 @@ func TestingSaveRegistry() func() {
for k, v := range aliasRegistry {
origAliases[k] = v
}
var origTenantReadOnlyKeys = make([]InternalKey, len(tenantReadOnlyKeys))
copy(origTenantReadOnlyKeys, tenantReadOnlyKeys)
return func() {
registry = origRegistry
aliasRegistry = origAliases
tenantReadOnlyKeys = origTenantReadOnlyKeys
}
}

Expand Down Expand Up @@ -277,6 +285,9 @@ func register(class Class, key InternalKey, desc string, s internalSetting) {
s.init(class, key, desc, slot)
registry[key] = s
slotTable[slot] = s
if class == TenantReadOnly {
tenantReadOnlyKeys = append(tenantReadOnlyKeys, key)
}
}

func registerAlias(key InternalKey, name SettingName, nameStatus NameStatus) {
Expand All @@ -303,6 +314,13 @@ func Keys(forSystemTenant bool) (res []InternalKey) {
return res
}

// TenantReadOnlyKeys returns a array with all the known keys that
// have the class TenantReadOnly. It might not be sorted.
// The caller must refrain from modifying the return value.
func TenantReadOnlyKeys() []InternalKey {
return tenantReadOnlyKeys
}

// ConsoleKeys return an array with all cluster settings keys
// used by the UI Console.
// This list should only contain settings that have no sensitive
Expand Down

0 comments on commit d632d44

Please sign in to comment.