Skip to content

Commit

Permalink
server/settingswatcher: handle rangefeedcache restarts
Browse files Browse the repository at this point in the history
This is an improvement over what the library provided before. It previously
assumed that the rangefeed would never encounter a permanent error after the
initial scan. There are cases where such errors can occur. This commit defers
to the rangefeedcache.Start loop to handle them.

Release note: None
  • Loading branch information
ajwerner committed Jan 13, 2022
1 parent 352f790 commit 5eee2d1
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 35 deletions.
2 changes: 2 additions & 0 deletions pkg/server/settingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ go_test(
"row_decoder_external_test.go",
"row_decoder_test.go",
"settings_watcher_external_test.go",
"settings_watcher_test.go",
],
embed = [":settingswatcher"],
deps = [
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
52 changes: 28 additions & 24 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type SettingsWatcher struct {
values map[string]RawValue
overrides map[string]RawValue
}

// testingWatcherKnobs allows the client to inject testing knobs into
// the underlying rangefeedcache.Watcher.
testingWatcherKnobs *rangefeedcache.TestingKnobs
}

// Storage is used to write a snapshot of KVs out to disk for use upon restart.
Expand Down Expand Up @@ -103,14 +107,22 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
EndKey: settingsTablePrefix.PrefixEnd(),
}
u := s.settings.MakeUpdater()
initialScanDone := make(chan struct{})
var initialScanErr error
var initialScan = struct {
ch chan struct{}
done bool
err error
}{
ch: make(chan struct{}),
}
noteUpdate := func(update rangefeedcache.Update) {
s.mu.Lock()
defer s.mu.Unlock()
if update.Type == rangefeedcache.CompleteUpdate {
u.ResetRemaining(ctx)
close(initialScanDone)
if !initialScan.done {
initialScan.done = true
close(initialScan.ch)
}
}
}

Expand Down Expand Up @@ -146,12 +158,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
// number thought ought to be big enough. Note that if there is no underlying
// storage, we'll never produce any events in s.handleKV() so we can use a
// bufferSize of 0.
//
// TODO(ajwerner): Use rangefeedcache.Start and run the cache in a loop
// to deal with buffer overflows. On a fresh scan, there ought not be
// more settings values than there exists cluster settings, though we
// need to deal with the fact that new settings can be added in the next
// version and we can have retired values we don't know about.
var bufferSize int
if s.storage != nil {
bufferSize = settings.MaxSettings * 3
Expand Down Expand Up @@ -187,28 +193,26 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
noteUpdate(update)
maybeUpdateSnapshot(update)
},
nil, // knobs
s.testingWatcherKnobs,
)
if err := s.stopper.RunAsyncTask(ctx, "setting", func(ctx context.Context) {
ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx)
defer cancel()
err := c.Run(ctx)
select {
case <-initialScanDone:
default:
initialScanErr = err
close(initialScanDone)

// Kick off the rangefeedcache which will retry until the stopper stops.
if err := rangefeedcache.Start(ctx, s.stopper, c, func(err error) {
if !initialScan.done {
initialScan.err = err
initialScan.done = true
close(initialScan.ch)
} else {
u = s.settings.MakeUpdater()
}
}); err != nil {
return err // we're shutting down
}

// Wait for the initial scan before returning.
select {
case <-initialScanDone:
if initialScanErr != nil {
return initialScanErr
}
return nil
case <-initialScan.ch:
return initialScan.err

case <-s.stopper.ShouldQuiesce():
return errors.Wrap(stop.ErrUnavailable, "failed to retrieve initial cluster settings")
Expand Down
12 changes: 1 addition & 11 deletions pkg/server/settingswatcher/settings_watcher_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,7 @@ func TestSettingWatcherOnTenant(t *testing.T) {
return len(rows)
}
checkSettingsValuesMatch := func(a, b *cluster.Settings) error {
for _, k := range settings.Keys(false /* forSystemTenant */) {
s, ok := settings.Lookup(k, settings.LookupForLocalAccess, false /* forSystemTenant */)
require.True(t, ok)
if s.Class() == settings.SystemOnly {
continue
}
if av, bv := s.String(&a.SV), s.String(&b.SV); av != bv {
return errors.Errorf("values do not match for %s: %s != %s", k, av, bv)
}
}
return nil
return settingswatcher.CheckSettingsValuesMatch(t, a, b)
}
checkStoredValuesMatch := func(expected []roachpb.KeyValue) error {
got := filterSystemOnly(getSourceClusterRows())
Expand Down
104 changes: 104 additions & 0 deletions pkg/server/settingswatcher/settings_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package settingswatcher

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// Test that an error occurring during processing of the
// rangefeedcache.Watcher can be recovered after a permanent
// rangefeed failure.
func TestOverflowRestart(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

sideSettings := cluster.MakeTestingClusterSettings()
w := New(
s.Clock(),
s.ExecutorConfig().(sql.ExecutorConfig).Codec,
sideSettings,
s.RangeFeedFactory().(*rangefeed.Factory),
s.Stopper(),
nil,
)
var exitCalled int64 // accessed with atomics
errCh := make(chan error)
w.testingWatcherKnobs = &rangefeedcache.TestingKnobs{
PreExit: func() { atomic.AddInt64(&exitCalled, 1) },
ErrorInjectionCh: errCh,
}
require.NoError(t, w.Start(ctx))
tdb := sqlutils.MakeSQLRunner(sqlDB)
// Shorten the closed timestamp duration as a cheeky way to check the
// checkpointing code while also speeding up the test.
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10 ms'")
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'")

checkSettings := func() {
testutils.SucceedsSoon(t, func() error {
return CheckSettingsValuesMatch(t, s.ClusterSettings(), sideSettings)
})
}
checkExits := func(exp int64) {
require.Equal(t, exp, atomic.LoadInt64(&exitCalled))
}
waitForExits := func(exp int64) {
require.Eventually(t, func() bool {
return atomic.LoadInt64(&exitCalled) == exp
}, time.Minute, time.Millisecond)
}

checkSettings()
tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '1m'")
checkSettings()
checkExits(0)
errCh <- errors.New("boom")
waitForExits(1)
tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '2s'")
checkSettings()
checkExits(1)
}

// CheckSettingsValuesMatch is a test helper function to return an error when
// two settings do not match. It generally gets used with SucceeedsSoon.
func CheckSettingsValuesMatch(t *testing.T, a, b *cluster.Settings) error {
for _, k := range settings.Keys(false /* forSystemTenant */) {
s, ok := settings.Lookup(k, settings.LookupForLocalAccess, false /* forSystemTenant */)
require.True(t, ok)
if s.Class() == settings.SystemOnly {
continue
}
if av, bv := s.String(&a.SV), s.String(&b.SV); av != bv {
return errors.Errorf("values do not match for %s: %s != %s", k, av, bv)
}
}
return nil
}

0 comments on commit 5eee2d1

Please sign in to comment.