From d632d44d1095d0519aae4f858e286f178e46dd5a Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Mon, 25 Sep 2023 15:51:43 +0200 Subject: [PATCH] settingswatcher: call a function upon updates to TenantReadOnly settings In a later commit, we will want to feed changes to TenantReadOnly settings in the system tenant's `system.settings` table into the connector, for use by the virtual cluster servers. This commit sets up the first building block: an extra callback function in the settings watcher that gets called when the watcher detects a change to TenantReadOnly settings. Release note: None --- .../settingswatcher/settings_watcher.go | 122 +++++++++++++++++- .../settings_watcher_external_test.go | 101 +++++++++++++++ pkg/settings/registry.go | 18 +++ 3 files changed, 234 insertions(+), 7 deletions(-) diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 44ea86e75a32..7849db6c709f 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -14,6 +14,7 @@ package settingswatcher import ( "context" + "sort" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" @@ -69,6 +70,16 @@ type SettingsWatcher struct { updateWait chan struct{} } + // notifyTenantReadOnlyChange is called when one or more + // TenantReadOnly setting changes. It is only set when the + // SettingsWatcher is created with NewWithNotifier. It is used by + // the tenant setting override watcher to pick up defaults set via + // system.settings in the system tenant. + // + // The callee function can assume that the slice in the second + // argument is sorted by InternalKey. + notifyTenantReadOnlyChange func(context.Context, []kvpb.TenantSetting) + // testingWatcherKnobs allows the client to inject testing knobs into // the underlying rangefeedcache.Watcher. testingWatcherKnobs *rangefeedcache.TestingKnobs @@ -105,6 +116,23 @@ func New( return s } +// NewWithNotifier constructs a new SettingsWatcher which notifies +// an observer about changes to TenantReadOnly settings. +func NewWithNotifier( + ctx context.Context, + clock *hlc.Clock, + codec keys.SQLCodec, + settingsToUpdate *cluster.Settings, + f *rangefeed.Factory, + stopper *stop.Stopper, + notify func(context.Context, []kvpb.TenantSetting), + storage Storage, // optional +) *SettingsWatcher { + w := New(clock, codec, settingsToUpdate, f, stopper, storage) + w.notifyTenantReadOnlyChange = notify + return w +} + // NewWithOverrides constructs a new SettingsWatcher which allows external // overrides, discovered through an OverridesMonitor. func NewWithOverrides( @@ -126,6 +154,14 @@ func NewWithOverrides( // have been retrieved. An error will be returned if the context is canceled or // the stopper is stopped prior to the initial data being retrieved. func (s *SettingsWatcher) Start(ctx context.Context) error { + // Ensure we inform the read-only default notify callback function + // of the build-time defaults for TenantReadOnly settings. + // + // Note: we cannot call this in the New() function above because + // this can only be called after the in-RAM values have been loaded + // from disk, which happens some time during server PreStart(). + s.loadInitialReadOnlyDefaults(ctx) + settingsTablePrefix := s.codec.TablePrefix(keys.SettingsTableID) settingsTableSpan := roachpb.Span{ Key: settingsTablePrefix, @@ -242,6 +278,35 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } +func (s *SettingsWatcher) loadInitialReadOnlyDefaults(ctx context.Context) { + if s.notifyTenantReadOnlyChange == nil { + return + } + + // When there is no explicit value in system.settings for a TenantReadOnly + // setting, we still want to propagate the system tenant's idea + // of the default value as an override to secondary tenants. + // + // This is because the secondary tenant may be using another version + // of the executable, where there is another default value for the + // setting. We want to make sure that the secondary tenant's idea of + // the default value is the same as the system tenant's. + + tenantReadOnlyKeys := settings.TenantReadOnlyKeys() + payloads := make([]kvpb.TenantSetting, 0, len(tenantReadOnlyKeys)) + for _, key := range tenantReadOnlyKeys { + knownSetting, payload := s.getSettingAndValue(key) + if !knownSetting { + panic(errors.AssertionFailedf("programming error: unknown setting %s", key)) + } + payloads = append(payloads, payload) + } + // Make sure the payloads are sorted, as this is required by the + // notify API. + sort.Slice(payloads, func(i, j int) bool { return payloads[i].InternalKey < payloads[j].InternalKey }) + s.notifyTenantReadOnlyChange(ctx, payloads) +} + // TestingRestart restarts the rangefeeds and waits for the initial // update after the rangefeed update to be processed. func (s *SettingsWatcher) TestingRestart() { @@ -272,18 +337,20 @@ func (s *SettingsWatcher) handleKV( } settingKey := settings.InternalKey(settingKeyS) + setting, ok := settings.LookupForLocalAccessByKey(settingKey, s.codec.ForSystemTenant()) + if !ok { + log.Warningf(ctx, "unknown setting %s, skipping update", settingKey) + return nil + } if !s.codec.ForSystemTenant() { - setting, ok := settings.LookupForLocalAccessByKey(settingKey, s.codec.ForSystemTenant()) - if !ok { - log.Warningf(ctx, "unknown setting %s, skipping update", settingKey) - return nil - } if setting.Class() != settings.TenantWritable { log.Warningf(ctx, "ignoring read-only setting %s", settingKey) return nil } } + log.VEventf(ctx, 1, "found rangefeed event for %q = %+v (tombstone=%v)", settingKey, val, tombstone) + // Ensure that the update is persisted to the local cache before we // propagate the value to the in-RAM store. This ensures the latest // value will be reloaded from the cache if the service is @@ -301,7 +368,7 @@ func (s *SettingsWatcher) handleKV( val: val, ts: kv.Value.Timestamp, tombstone: tombstone, - }) + }, setting.Class()) if s.storage != nil { return kv } @@ -311,7 +378,7 @@ func (s *SettingsWatcher) handleKV( // maybeSet will update the stored value and the corresponding setting // in response to a kv event, assuming that event is new. func (s *SettingsWatcher) maybeSet( - ctx context.Context, key settings.InternalKey, sv settingsValue, + ctx context.Context, key settings.InternalKey, sv settingsValue, class settings.Class, ) { s.mu.Lock() defer s.mu.Unlock() @@ -336,6 +403,12 @@ func (s *SettingsWatcher) maybeSet( s.setLocked(ctx, key, sv.val, settings.OriginExplicitlySet) } } + + if class == settings.TenantReadOnly { + // Notify the tenant settings watcher there is a new fallback + // default for this setting. + s.setTenantReadOnlyDefault(ctx, key) + } } // settingValue tracks an observed value from the rangefeed. By tracking the @@ -434,6 +507,7 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) (updateCh <-chan } // A new override was added or an existing override has changed. s.mu.overrides[key] = val + log.VEventf(ctx, 2, "applying override for %s = %q", key, val.Value) s.setLocked(ctx, key, val, settings.OriginExternallySet) } @@ -527,3 +601,37 @@ func (s *SettingsWatcher) GetClusterVersionFromStorage( func (s *SettingsWatcher) GetTenantClusterVersion() clusterversion.Handle { return s.settings.Version } + +// setTenantReadOnlyDefault is called by the watcher above for any +// changes to system.settings made on a setting with class +// TenantReadOnly. +func (s *SettingsWatcher) setTenantReadOnlyDefault(ctx context.Context, key settings.InternalKey) { + if s.notifyTenantReadOnlyChange == nil { + return + } + + found, payload := s.getSettingAndValue(key) + if !found { + // We are observing an update for a setting that does not exist + // (any more). This can happen if there was a customization in the + // system.settings table from a previous version and the setting + // was retired. + return + } + + log.VEventf(ctx, 1, "propagating read-only default %+v", payload) + + s.notifyTenantReadOnlyChange(ctx, []kvpb.TenantSetting{payload}) +} + +func (s *SettingsWatcher) getSettingAndValue(key settings.InternalKey) (bool, kvpb.TenantSetting) { + setting, ok := settings.LookupForLocalAccessByKey(key, settings.ForSystemTenant) + if !ok { + return false, kvpb.TenantSetting{} + } + payload := kvpb.TenantSetting{InternalKey: key, Value: settings.EncodedValue{ + Value: setting.Encoded(&s.settings.SV), + Type: setting.Typ(), + }} + return true, payload +} diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index 209be02d5f4a..389234d4373a 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -626,3 +626,104 @@ func TestStaleRowsDoNotCauseSettingsToRegress(t *testing.T) { require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1))) settingStillHasValueAfterAShortWhile(t, defaultFakeSettingValue) } + +var _ = settings.RegisterStringSetting(settings.TenantReadOnly, "str.baz", "desc", "initial") +var _ = settings.RegisterStringSetting(settings.SystemOnly, "str.yay", "desc", "") + +// TestNotifyCalledUponReadOnlySettingChanges verifies that the notify +// function callback is called when a TenantReadOnly setting is +// updated in system.settings. +func TestNotifyCalledUponReadOnlySettingChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + }) + defer s.Stopper().Stop(ctx) + + sysDB := sqlutils.MakeSQLRunner(s.SystemLayer().SQLConn(t, "")) + + ts := s.ApplicationLayer() + st := ts.ClusterSettings() + stopper := ts.AppStopper() + + mu := struct { + syncutil.Mutex + updated []kvpb.TenantSetting + }{} + reset := func() { + mu.Lock() + defer mu.Unlock() + mu.updated = nil + } + contains := func(key settings.InternalKey) (bool, string) { + mu.Lock() + defer mu.Unlock() + for _, s := range mu.updated { + if s.InternalKey == key { + return true, s.Value.Value + } + } + return false, "" + } + + notify := func(_ context.Context, updated []kvpb.TenantSetting) { + mu.Lock() + defer mu.Unlock() + mu.updated = append(mu.updated, updated...) + } + + f, err := rangefeed.NewFactory(stopper, kvDB, st, &rangefeed.TestingKnobs{}) + require.NoError(t, err) + w := settingswatcher.NewWithNotifier(ctx, ts.Clock(), ts.Codec(), st, f, stopper, notify, nil) + require.NoError(t, w.Start(ctx)) + + t.Run("initial scan", func(t *testing.T) { + // The notifier is called at least once for all the + // pre-existing TenantReadOnly settings. + testutils.SucceedsSoon(t, func() error { + for _, k := range settings.TenantReadOnlyKeys() { + seen, v := contains(k) + if !seen { + return errors.Newf("%s not seen yet", k) + } + if k == "str.baz" { + require.Equal(t, "initial", v) + } + } + return nil + }) + }) + + t.Run("update", func(t *testing.T) { + reset() + + // Update a setting using SQL and verify the notifier is called for + // it eventually. Also verify that changes to other settings are + // not notified. + sysDB.Exec(t, "SET CLUSTER SETTING str.yay = 'newval'") + sysDB.Exec(t, "SET CLUSTER SETTING str.foo = 'newval'") + sysDB.Exec(t, "SET CLUSTER SETTING str.baz = 'newval'") + testutils.SucceedsSoon(t, func() error { + seen, v := contains("str.baz") + if !seen { + return errors.New("not seen yet") + } + require.Equal(t, "newval", v) + + // The rangefeed event for str.baz was delivered after those for + // str.foo and str.yay. If we had incorrectly notified an update + // for non-TenantReadOnly setting, they would show up in the + // updated list. + mu.Lock() + defer mu.Unlock() + if len(mu.updated) != 1 { + t.Errorf("expected 1 setting update, got: %+v", mu.updated) + } + return nil + }) + }) +} diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 7a1c4d121278..df7941da027d 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -26,6 +26,11 @@ import ( // read concurrently by different callers. var registry = make(map[InternalKey]internalSetting) +// tenantReadOnlyKeys contains the keys of settings that have the +// class TenantReadOnly. This is used to initialize defaults in the +// tenant settings watcher. +var tenantReadOnlyKeys []InternalKey + // aliasRegistry contains the mapping of names to keys, for names // different from the keys. var aliasRegistry = make(map[SettingName]aliasEntry) @@ -52,9 +57,12 @@ func TestingSaveRegistry() func() { for k, v := range aliasRegistry { origAliases[k] = v } + var origTenantReadOnlyKeys = make([]InternalKey, len(tenantReadOnlyKeys)) + copy(origTenantReadOnlyKeys, tenantReadOnlyKeys) return func() { registry = origRegistry aliasRegistry = origAliases + tenantReadOnlyKeys = origTenantReadOnlyKeys } } @@ -277,6 +285,9 @@ func register(class Class, key InternalKey, desc string, s internalSetting) { s.init(class, key, desc, slot) registry[key] = s slotTable[slot] = s + if class == TenantReadOnly { + tenantReadOnlyKeys = append(tenantReadOnlyKeys, key) + } } func registerAlias(key InternalKey, name SettingName, nameStatus NameStatus) { @@ -303,6 +314,13 @@ func Keys(forSystemTenant bool) (res []InternalKey) { return res } +// TenantReadOnlyKeys returns a array with all the known keys that +// have the class TenantReadOnly. It might not be sorted. +// The caller must refrain from modifying the return value. +func TenantReadOnlyKeys() []InternalKey { + return tenantReadOnlyKeys +} + // ConsoleKeys return an array with all cluster settings keys // used by the UI Console. // This list should only contain settings that have no sensitive