From 10e1ea991ce56956ce14da1d24de1f6f7d233b02 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 20 Sep 2023 05:58:31 +0200 Subject: [PATCH] server: fix sync on setting overrides for secondary tenants The previous patch in this area was merely restarting the rangefeed but did not actually wait for the initial update event to be received. This patch fixes it. Release note: None --- .../settingswatcher/settings_watcher.go | 22 +++++++++++++++---- pkg/server/tenantsettingswatcher/watcher.go | 18 +++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index e28b72418ae1..c5c51702617b 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -60,6 +60,9 @@ type SettingsWatcher struct { // inside secondary tenants. It will be uninitialized in a system // tenant. storageClusterVersion clusterversion.ClusterVersion + + // Used by TestingRestart. + updateWait chan struct{} } // testingWatcherKnobs allows the client to inject testing knobs into @@ -85,7 +88,7 @@ func New( stopper *stop.Stopper, storage Storage, // optional ) *SettingsWatcher { - return &SettingsWatcher{ + s := &SettingsWatcher{ clock: clock, codec: codec, settings: settingsToUpdate, @@ -94,6 +97,8 @@ func New( dec: MakeRowDecoder(codec), storage: storage, } + s.mu.updateWait = make(chan struct{}) + return s } // NewWithOverrides constructs a new SettingsWatcher which allows external @@ -141,6 +146,9 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { initialScan.done = true close(initialScan.ch) } + // Used by TestingRestart(). + close(s.mu.updateWait) + s.mu.updateWait = make(chan struct{}) } s.mu.values = make(map[settings.InternalKey]settingsValue) @@ -248,9 +256,15 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } -func (w *SettingsWatcher) TestingRestart() { - if w.rfc != nil { - w.rfc.TestingRestart() +// TestingRestart restarts the rangefeeds and waits for the initial +// update after the rangefeed update to be processed. +func (s *SettingsWatcher) TestingRestart() { + if s.rfc != nil { + s.mu.Lock() + waitCh := s.mu.updateWait + s.mu.Unlock() + s.rfc.TestingRestart() + <-waitCh } } diff --git a/pkg/server/tenantsettingswatcher/watcher.go b/pkg/server/tenantsettingswatcher/watcher.go index 9560f4c5b58e..83aadc5effa6 100644 --- a/pkg/server/tenantsettingswatcher/watcher.go +++ b/pkg/server/tenantsettingswatcher/watcher.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -64,6 +65,11 @@ type Watcher struct { // rfc provides access to the underlying rangefeedcache.Watcher for // testing. rfc *rangefeedcache.Watcher + mu struct { + syncutil.Mutex + // Used by TestingRestart. + updateWait chan struct{} + } } // New constructs a new Watcher. @@ -78,6 +84,7 @@ func New( dec: MakeRowDecoder(), } w.store.Init() + w.mu.updateWait = make(chan struct{}) return w } @@ -162,6 +169,11 @@ func (w *Watcher) startRangeFeed( initialScan.done = true close(initialScan.ch) } + // Used by TestingRestart(). + w.mu.Lock() + defer w.mu.Unlock() + close(w.mu.updateWait) + w.mu.updateWait = make(chan struct{}) } } @@ -220,9 +232,15 @@ func (w *Watcher) WaitForStart(ctx context.Context) error { } } +// TestingRestart restarts the rangefeeds and waits for the initial +// update after the rangefeed update to be processed. func (w *Watcher) TestingRestart() { if w.rfc != nil { + w.mu.Lock() + waitCh := w.mu.updateWait + w.mu.Unlock() w.rfc.TestingRestart() + <-waitCh } }