From 5eee2d1f82683070542efbf6c68bcdafdb77e452 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 11 Jan 2022 01:31:05 -0500 Subject: [PATCH] server/settingswatcher: handle rangefeedcache restarts This is an improvement over what the library provided before. It previously assumed that the rangefeed would never encounter a permanent error after the initial scan. There are cases where such errors can occur. This commit defers to the rangefeedcache.Start loop to handle them. Release note: None --- pkg/server/settingswatcher/BUILD.bazel | 2 + .../settingswatcher/settings_watcher.go | 52 +++++---- .../settings_watcher_external_test.go | 12 +- .../settingswatcher/settings_watcher_test.go | 104 ++++++++++++++++++ 4 files changed, 135 insertions(+), 35 deletions(-) create mode 100644 pkg/server/settingswatcher/settings_watcher_test.go diff --git a/pkg/server/settingswatcher/BUILD.bazel b/pkg/server/settingswatcher/BUILD.bazel index ea5040ee599d..0798029adcb3 100644 --- a/pkg/server/settingswatcher/BUILD.bazel +++ b/pkg/server/settingswatcher/BUILD.bazel @@ -42,6 +42,7 @@ go_test( "row_decoder_external_test.go", "row_decoder_test.go", "settings_watcher_external_test.go", + "settings_watcher_test.go", ], embed = [":settingswatcher"], deps = [ @@ -49,6 +50,7 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/roachpb:with-mocks", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 9b17a7fe2a52..6aba7e99ab95 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -50,6 +50,10 @@ type SettingsWatcher struct { values map[string]RawValue overrides map[string]RawValue } + + // testingWatcherKnobs allows the client to inject testing knobs into + // the underlying rangefeedcache.Watcher. + testingWatcherKnobs *rangefeedcache.TestingKnobs } // Storage is used to write a snapshot of KVs out to disk for use upon restart. @@ -103,14 +107,22 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { EndKey: settingsTablePrefix.PrefixEnd(), } u := s.settings.MakeUpdater() - initialScanDone := make(chan struct{}) - var initialScanErr error + var initialScan = struct { + ch chan struct{} + done bool + err error + }{ + ch: make(chan struct{}), + } noteUpdate := func(update rangefeedcache.Update) { s.mu.Lock() defer s.mu.Unlock() if update.Type == rangefeedcache.CompleteUpdate { u.ResetRemaining(ctx) - close(initialScanDone) + if !initialScan.done { + initialScan.done = true + close(initialScan.ch) + } } } @@ -146,12 +158,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { // number thought ought to be big enough. Note that if there is no underlying // storage, we'll never produce any events in s.handleKV() so we can use a // bufferSize of 0. - // - // TODO(ajwerner): Use rangefeedcache.Start and run the cache in a loop - // to deal with buffer overflows. On a fresh scan, there ought not be - // more settings values than there exists cluster settings, though we - // need to deal with the fact that new settings can be added in the next - // version and we can have retired values we don't know about. var bufferSize int if s.storage != nil { bufferSize = settings.MaxSettings * 3 @@ -187,28 +193,26 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { noteUpdate(update) maybeUpdateSnapshot(update) }, - nil, // knobs + s.testingWatcherKnobs, ) - if err := s.stopper.RunAsyncTask(ctx, "setting", func(ctx context.Context) { - ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) - defer cancel() - err := c.Run(ctx) - select { - case <-initialScanDone: - default: - initialScanErr = err - close(initialScanDone) + + // Kick off the rangefeedcache which will retry until the stopper stops. + if err := rangefeedcache.Start(ctx, s.stopper, c, func(err error) { + if !initialScan.done { + initialScan.err = err + initialScan.done = true + close(initialScan.ch) + } else { + u = s.settings.MakeUpdater() } }); err != nil { return err // we're shutting down } + // Wait for the initial scan before returning. select { - case <-initialScanDone: - if initialScanErr != nil { - return initialScanErr - } - return nil + case <-initialScan.ch: + return initialScan.err case <-s.stopper.ShouldQuiesce(): return errors.Wrap(stop.ErrUnavailable, "failed to retrieve initial cluster settings") diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index 37eef666400e..1790537b8621 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -98,17 +98,7 @@ func TestSettingWatcherOnTenant(t *testing.T) { return len(rows) } checkSettingsValuesMatch := func(a, b *cluster.Settings) error { - for _, k := range settings.Keys(false /* forSystemTenant */) { - s, ok := settings.Lookup(k, settings.LookupForLocalAccess, false /* forSystemTenant */) - require.True(t, ok) - if s.Class() == settings.SystemOnly { - continue - } - if av, bv := s.String(&a.SV), s.String(&b.SV); av != bv { - return errors.Errorf("values do not match for %s: %s != %s", k, av, bv) - } - } - return nil + return settingswatcher.CheckSettingsValuesMatch(t, a, b) } checkStoredValuesMatch := func(expected []roachpb.KeyValue) error { got := filterSystemOnly(getSourceClusterRows()) diff --git a/pkg/server/settingswatcher/settings_watcher_test.go b/pkg/server/settingswatcher/settings_watcher_test.go new file mode 100644 index 000000000000..f57f7145a70b --- /dev/null +++ b/pkg/server/settingswatcher/settings_watcher_test.go @@ -0,0 +1,104 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package settingswatcher + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// Test that an error occurring during processing of the +// rangefeedcache.Watcher can be recovered after a permanent +// rangefeed failure. +func TestOverflowRestart(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + sideSettings := cluster.MakeTestingClusterSettings() + w := New( + s.Clock(), + s.ExecutorConfig().(sql.ExecutorConfig).Codec, + sideSettings, + s.RangeFeedFactory().(*rangefeed.Factory), + s.Stopper(), + nil, + ) + var exitCalled int64 // accessed with atomics + errCh := make(chan error) + w.testingWatcherKnobs = &rangefeedcache.TestingKnobs{ + PreExit: func() { atomic.AddInt64(&exitCalled, 1) }, + ErrorInjectionCh: errCh, + } + require.NoError(t, w.Start(ctx)) + tdb := sqlutils.MakeSQLRunner(sqlDB) + // Shorten the closed timestamp duration as a cheeky way to check the + // checkpointing code while also speeding up the test. + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10 ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") + + checkSettings := func() { + testutils.SucceedsSoon(t, func() error { + return CheckSettingsValuesMatch(t, s.ClusterSettings(), sideSettings) + }) + } + checkExits := func(exp int64) { + require.Equal(t, exp, atomic.LoadInt64(&exitCalled)) + } + waitForExits := func(exp int64) { + require.Eventually(t, func() bool { + return atomic.LoadInt64(&exitCalled) == exp + }, time.Minute, time.Millisecond) + } + + checkSettings() + tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '1m'") + checkSettings() + checkExits(0) + errCh <- errors.New("boom") + waitForExits(1) + tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '2s'") + checkSettings() + checkExits(1) +} + +// CheckSettingsValuesMatch is a test helper function to return an error when +// two settings do not match. It generally gets used with SucceeedsSoon. +func CheckSettingsValuesMatch(t *testing.T, a, b *cluster.Settings) error { + for _, k := range settings.Keys(false /* forSystemTenant */) { + s, ok := settings.Lookup(k, settings.LookupForLocalAccess, false /* forSystemTenant */) + require.True(t, ok) + if s.Class() == settings.SystemOnly { + continue + } + if av, bv := s.String(&a.SV), s.String(&b.SV); av != bv { + return errors.Errorf("values do not match for %s: %s != %s", k, av, bv) + } + } + return nil +}