From ad326c11e31e27ced0cabf02bd9616e029425d10 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 29 Sep 2023 03:04:53 +0200 Subject: [PATCH 1/3] settingswatcher: move an inline function to a method (This function/method will also be deleted in a later commit.) Release note: None --- .../settingswatcher/settings_watcher.go | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 1a589a6cf128..89662a32afc1 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -45,7 +45,12 @@ type SettingsWatcher struct { f *rangefeed.Factory stopper *stop.Stopper dec RowDecoder - storage Storage + + // storage is used to persist a local cache of the setting + // overrides, for use when a node starts up before KV is ready. + storage Storage + // snapshot is what goes into the local cache. + snapshot []roachpb.KeyValue overridesMonitor OverridesMonitor @@ -194,24 +199,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { if s.storage != nil { bufferSize = settings.MaxSettings * 3 } - var snapshot []roachpb.KeyValue // used with storage - maybeUpdateSnapshot := func(update rangefeedcache.Update) { - // Only record the update to the buffer if we're writing to storage. - if s.storage == nil || - // and the update has some new information to write. - (update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) { - return - } - eventKVs := rangefeedbuffer.EventsToKVs(update.Events, - rangefeedbuffer.RangeFeedValueEventToKV) - switch update.Type { - case rangefeedcache.CompleteUpdate: - snapshot = eventKVs - case rangefeedcache.IncrementalUpdate: - snapshot = rangefeedbuffer.MergeKVs(snapshot, eventKVs) - } - s.storage.SnapshotKVs(ctx, snapshot) - } c := rangefeedcache.NewWatcher( "settings-watcher", s.clock, s.f, @@ -223,7 +210,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { }, func(ctx context.Context, update rangefeedcache.Update) { noteUpdate(update) - maybeUpdateSnapshot(update) + s.maybeUpdateSnapshot(ctx, update) }, s.testingWatcherKnobs, ) @@ -255,6 +242,24 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } +func (s *SettingsWatcher) maybeUpdateSnapshot(ctx context.Context, update rangefeedcache.Update) { + // Only record the update to the buffer if we're writing to storage. + if s.storage == nil || + // and the update has some new information to write. + (update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) { + return + } + eventKVs := rangefeedbuffer.EventsToKVs(update.Events, + rangefeedbuffer.RangeFeedValueEventToKV) + switch update.Type { + case rangefeedcache.CompleteUpdate: + s.snapshot = eventKVs + case rangefeedcache.IncrementalUpdate: + s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, eventKVs) + } + s.storage.SnapshotKVs(ctx, s.snapshot) +} + // TestingRestart restarts the rangefeeds and waits for the initial // update after the rangefeed update to be processed. func (s *SettingsWatcher) TestingRestart() { From 81ebdaf7c659663f89d0da1028f683753e1c71fa Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 29 Sep 2023 02:21:24 +0200 Subject: [PATCH 2/3] server,settingswatcher: properly evict entries from the local persisted cache (For context, on each node there is a local persisted cache of cluster setting customizations. This exists to ensure that configured values can be used even before a node has fully started up and can start reading customizations from `system.settings`.) Prior to this patch, entries were never evicted from the local persisted cache: when a cluster setting was reset, any previously saved entry in the cache would remain there. This is a very old bug, which was long hidden and was recently revealed when commit 2f5d717178a87b42fa94c20a226cc491d185455d was merged. In a nutshell, before this recent commit the code responsible to load the entries from the cache didn't fully work and so the stale entries were never restored from the cache. That commit fixed the loader code, and so the stale entries became active, which made the old bug visible. To fix the old bug, this present commit modifies the settings watcher to preserve KV deletion events, and propagates them to the persisted cache. (There is no release note because there is no user-facing release where the bug was visible.) Release note: None Co-authored-by: Steven Danna --- pkg/server/settings_cache.go | 16 ++++- pkg/server/settings_cache_test.go | 59 +++++++++++++++++++ .../settingswatcher/settings_watcher.go | 1 + .../settings_watcher_external_test.go | 9 ++- 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/pkg/server/settings_cache.go b/pkg/server/settings_cache.go index d3110d6ed6a9..2ac7bf68528b 100644 --- a/pkg/server/settings_cache.go +++ b/pkg/server/settings_cache.go @@ -99,10 +99,23 @@ var _ settingswatcher.Storage = (*settingsCacheWriter)(nil) func storeCachedSettingsKVs(ctx context.Context, eng storage.Engine, kvs []roachpb.KeyValue) error { batch := eng.NewBatch() defer batch.Close() + + // Remove previous entries -- they are now stale. + if _, _, _, err := storage.MVCCDeleteRange(ctx, batch, + keys.LocalStoreCachedSettingsKeyMin, + keys.LocalStoreCachedSettingsKeyMax, + 0 /* no limit */, hlc.Timestamp{}, storage.MVCCWriteOptions{}, false /* returnKeys */); err != nil { + return err + } + + // Now we can populate the cache with new entries. for _, kv := range kvs { kv.Value.Timestamp = hlc.Timestamp{} // nb: Timestamp is not part of checksum + cachedSettingsKey := keys.StoreCachedSettingsKey(kv.Key) + // A new value is added, or an existing value is updated. + log.VEventf(ctx, 1, "storing cached setting: %s -> %+v", cachedSettingsKey, kv.Value) if err := storage.MVCCPut( - ctx, batch, keys.StoreCachedSettingsKey(kv.Key), hlc.Timestamp{}, kv.Value, storage.MVCCWriteOptions{}, + ctx, batch, cachedSettingsKey, hlc.Timestamp{}, kv.Value, storage.MVCCWriteOptions{}, ); err != nil { return err } @@ -151,6 +164,7 @@ func initializeCachedSettings( " skipping settings updates.") } settingKey := settings.InternalKey(settingKeyS) + log.VEventf(ctx, 1, "loaded cached setting: %s -> %+v", settingKey, val) if err := updater.Set(ctx, settingKey, val); err != nil { log.Warningf(ctx, "setting %q to %v failed: %+v", settingKey, val, err) } diff --git a/pkg/server/settings_cache_test.go b/pkg/server/settings_cache_test.go index c1cc463ac911..04f42296092d 100644 --- a/pkg/server/settings_cache_test.go +++ b/pkg/server/settings_cache_test.go @@ -13,6 +13,7 @@ package server import ( "context" "fmt" + "strings" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -146,3 +148,60 @@ Actual: %+v return nil }) } + +func TestCachedSettingDeletionIsPersisted(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + hasKey := func(kvs []roachpb.KeyValue, key string) bool { + for _, kv := range kvs { + if strings.Contains(string(kv.Key), key) { + return true + } + } + return false + } + + ctx := context.Background() + + ts, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + }) + defer ts.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(sqlDB) + + // Make the test faster. + st := ts.ClusterSettings() + closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond) + kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond) + + // Customize a setting. + db.Exec(t, `SET CLUSTER SETTING ui.display_timezone = 'America/New_York'`) + // The setting won't propagate to the store until the setting watcher caches + // up with the rangefeed, which might take a while. + testutils.SucceedsSoon(t, func() error { + store, err := ts.GetStores().(*kvserver.Stores).GetStore(1) + require.NoError(t, err) + settings, err := loadCachedSettingsKVs(context.Background(), store.TODOEngine()) + require.NoError(t, err) + if !hasKey(settings, `ui.display_timezone`) { + return errors.New("cached setting not found") + } + return nil + }) + + // Reset the setting. + db.Exec(t, `RESET CLUSTER SETTING ui.display_timezone`) + // Check that the setting is eventually deleted from the store. + testutils.SucceedsSoon(t, func() error { + store, err := ts.GetStores().(*kvserver.Stores).GetStore(1) + require.NoError(t, err) + settings, err := loadCachedSettingsKVs(context.Background(), store.TODOEngine()) + require.NoError(t, err) + if hasKey(settings, `ui.display_timezone`) { + return errors.New("cached setting was still found") + } + return nil + }) +} diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 89662a32afc1..abc3eec6c3ef 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -199,6 +199,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { if s.storage != nil { bufferSize = settings.MaxSettings * 3 } + c := rangefeedcache.NewWatcher( "settings-watcher", s.clock, s.f, diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index 5e92467ec235..209be02d5f4a 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -216,7 +216,14 @@ type fakeStorage struct { func (f *fakeStorage) SnapshotKVs(ctx context.Context, kvs []roachpb.KeyValue) { f.Lock() defer f.Unlock() - f.kvs = kvs + nonDeletions := make([]roachpb.KeyValue, 0, len(kvs)) + for _, kv := range kvs { + if !kv.Value.IsPresent() { + continue + } + nonDeletions = append(nonDeletions, kv) + } + f.kvs = nonDeletions f.numWrites++ } From c5af6243bde3fdf75e51f7728914e01267a935cb Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 28 Sep 2023 18:00:43 +0200 Subject: [PATCH 3/3] settingswatcher: write-through to the persisted cache Prior to this patch, the rangefeed watcher over `system.settings` was updating the in-RAM value store before it propagated the updates to the persisted local cache. In fact, the update to the persisted local cache was lagging quite a bit behind, because the rangefeed watcher would buffer updates and only flush them after a while. As a result, the following sequence was possible: 1. client updates a cluster setting. 2. server is immediately shut down. The persisted cache has not been updated yet. 3. server is restarted. For a short while (until the settings watcher has caught up), the old version of the setting remains active. This recall of ghost values of a setting was simply a bug. This patch fixes that, by ensuring that the persisted cache is written through before the in-RAM value store. By doing this, we give up on batching updates to the persisted local store. This is deemed acceptable because cluster settings are not updated frequently. Release note: None --- pkg/server/server.go | 12 ++- .../settingswatcher/settings_watcher.go | 40 +++++----- pkg/server/testing_knobs.go | 4 + pkg/settings/integration_tests/BUILD.bazel | 1 + .../integration_tests/settings_test.go | 74 +++++++++++++++++++ 5 files changed, 107 insertions(+), 24 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index b53e41eefad9..f2b03d10c81d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2143,8 +2143,16 @@ func (s *topLevelServer) PreStart(ctx context.Context) error { return err } - if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil { - return errors.Wrap(err, "failed to initialize the tenant settings watcher") + startSettingsWatcher := true + if serverKnobs := s.cfg.TestingKnobs.Server; serverKnobs != nil { + if serverKnobs.(*TestingKnobs).DisableSettingsWatcher { + startSettingsWatcher = false + } + } + if startSettingsWatcher { + if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil { + return errors.Wrap(err, "failed to initialize the tenant settings watcher") + } } if err := s.tenantCapabilitiesWatcher.Start(ctx); err != nil { return errors.Wrap(err, "initializing tenant capabilities") diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index abc3eec6c3ef..44ea86e75a32 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -211,7 +211,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { }, func(ctx context.Context, update rangefeedcache.Update) { noteUpdate(update) - s.maybeUpdateSnapshot(ctx, update) }, s.testingWatcherKnobs, ) @@ -243,24 +242,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } -func (s *SettingsWatcher) maybeUpdateSnapshot(ctx context.Context, update rangefeedcache.Update) { - // Only record the update to the buffer if we're writing to storage. - if s.storage == nil || - // and the update has some new information to write. - (update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) { - return - } - eventKVs := rangefeedbuffer.EventsToKVs(update.Events, - rangefeedbuffer.RangeFeedValueEventToKV) - switch update.Type { - case rangefeedcache.CompleteUpdate: - s.snapshot = eventKVs - case rangefeedcache.IncrementalUpdate: - s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, eventKVs) - } - s.storage.SnapshotKVs(ctx, s.snapshot) -} - // TestingRestart restarts the rangefeeds and waits for the initial // update after the rangefeed update to be processed. func (s *SettingsWatcher) TestingRestart() { @@ -276,11 +257,13 @@ func (s *SettingsWatcher) TestingRestart() { func (s *SettingsWatcher) handleKV( ctx context.Context, kv *kvpb.RangeFeedValue, ) rangefeedbuffer.Event { - var alloc tree.DatumAlloc - settingKeyS, val, tombstone, err := s.dec.DecodeRow(roachpb.KeyValue{ + rkv := roachpb.KeyValue{ Key: kv.Key, Value: kv.Value, - }, &alloc) + } + + var alloc tree.DatumAlloc + settingKeyS, val, tombstone, err := s.dec.DecodeRow(rkv, &alloc) if err != nil { // This should never happen: the rangefeed should only ever deliver valid SQL rows. err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode settings row %v", kv.Key) @@ -301,6 +284,19 @@ func (s *SettingsWatcher) handleKV( } } + // Ensure that the update is persisted to the local cache before we + // propagate the value to the in-RAM store. This ensures the latest + // value will be reloaded from the cache if the service is + // interrupted abruptly after the new value is seen by a client. + // + // Note: it is because we really want the cache to be updated before + // the in-RAM store that we do this here instead of batching the + // updates in the onUpdate rangefeed function. + if s.storage != nil { + s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, []roachpb.KeyValue{rkv}) + s.storage.SnapshotKVs(ctx, s.snapshot) + } + s.maybeSet(ctx, settingKey, settingsValue{ val: val, ts: kv.Value.Timestamp, diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 9eaa7980c0b8..dcd77718bf28 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -161,6 +161,10 @@ type TestingKnobs struct { // DialNodeCallback is used to mock dial errors when dialing a node. It is // invoked by the dialNode method of server.serverIterator. DialNodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error + + // DisableSettingsWatcher disables the watcher that monitors updates + // to system.settings. + DisableSettingsWatcher bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/settings/integration_tests/BUILD.bazel b/pkg/settings/integration_tests/BUILD.bazel index 7bbc72b18446..ad3ed76d2fea 100644 --- a/pkg/settings/integration_tests/BUILD.bazel +++ b/pkg/settings/integration_tests/BUILD.bazel @@ -19,5 +19,6 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/settings/integration_tests/settings_test.go b/pkg/settings/integration_tests/settings_test.go index 8b1910849d21..44443683a1b9 100644 --- a/pkg/settings/integration_tests/settings_test.go +++ b/pkg/settings/integration_tests/settings_test.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) const strKey = "testing.str" @@ -286,3 +288,75 @@ func TestSettingsShowAll(t *testing.T) { t.Fatalf("show all did not find the test keys: %q", rows) } } + +func TestSettingsPersistenceEndToEnd(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // We're going to restart the test server, but expecting storage to + // persist. Define a sticky VFS for this purpose. + stickyVFSRegistry := server.NewStickyVFSRegistry() + serverKnobs := &server.TestingKnobs{ + StickyVFSRegistry: stickyVFSRegistry, + } + serverArgs := base.TestServerArgs{ + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + StoreSpecs: []base.StoreSpec{ + {InMemory: true, StickyVFSID: "1"}, + }, + Knobs: base.TestingKnobs{ + Server: serverKnobs, + }, + } + + ts, sqlDB, _ := serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(sqlDB) + + // We need a custom value for the cluster setting that's guaranteed + // to be different from the default. So check that it's not equal to + // the default always. + const differentValue = `something` + + setting, _ := settings.LookupForLocalAccessByKey("cluster.organization", true) + s := setting.(*settings.StringSetting) + st := ts.ClusterSettings() + require.NotEqual(t, s.Get(&st.SV), differentValue) + origValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0] + + // Customize the setting. + db.Exec(t, `SET CLUSTER SETTING cluster.organization = $1`, differentValue) + newValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0] + + // Restart the server; verify the setting customization is preserved. + // For this we disable the settings watcher, to ensure that + // only the value loaded by the local persisted cache is used. + ts.Stopper().Stop(ctx) + serverKnobs.DisableSettingsWatcher = true + ts, sqlDB, _ = serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db = sqlutils.MakeSQLRunner(sqlDB) + + db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{newValue}}) + + // Restart the server to make the setting writable again. + ts.Stopper().Stop(ctx) + serverKnobs.DisableSettingsWatcher = false + ts, sqlDB, _ = serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db = sqlutils.MakeSQLRunner(sqlDB) + + // Reset the setting, then check the original value is restored. + db.Exec(t, `RESET CLUSTER SETTING cluster.organization`) + db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}}) + + // Restart the server; verify the original value is still there. + ts.Stopper().Stop(ctx) + ts, sqlDB, _ = serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db = sqlutils.MakeSQLRunner(sqlDB) + + db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}}) +}