Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server,settingswatcher: fix the local persisted cache #111475

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,16 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
return err
}

if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
return errors.Wrap(err, "failed to initialize the tenant settings watcher")
startSettingsWatcher := true
if serverKnobs := s.cfg.TestingKnobs.Server; serverKnobs != nil {
if serverKnobs.(*TestingKnobs).DisableSettingsWatcher {
startSettingsWatcher = false
}
}
if startSettingsWatcher {
if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
return errors.Wrap(err, "failed to initialize the tenant settings watcher")
}
}
if err := s.tenantCapabilitiesWatcher.Start(ctx); err != nil {
return errors.Wrap(err, "initializing tenant capabilities")
Expand Down
16 changes: 15 additions & 1 deletion pkg/server/settings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,23 @@ var _ settingswatcher.Storage = (*settingsCacheWriter)(nil)
func storeCachedSettingsKVs(ctx context.Context, eng storage.Engine, kvs []roachpb.KeyValue) error {
batch := eng.NewBatch()
defer batch.Close()

// Remove previous entries -- they are now stale.
if _, _, _, err := storage.MVCCDeleteRange(ctx, batch,
keys.LocalStoreCachedSettingsKeyMin,
keys.LocalStoreCachedSettingsKeyMax,
0 /* no limit */, hlc.Timestamp{}, storage.MVCCWriteOptions{}, false /* returnKeys */); err != nil {
return err
}

// Now we can populate the cache with new entries.
for _, kv := range kvs {
kv.Value.Timestamp = hlc.Timestamp{} // nb: Timestamp is not part of checksum
cachedSettingsKey := keys.StoreCachedSettingsKey(kv.Key)
// A new value is added, or an existing value is updated.
log.VEventf(ctx, 1, "storing cached setting: %s -> %+v", cachedSettingsKey, kv.Value)
if err := storage.MVCCPut(
ctx, batch, keys.StoreCachedSettingsKey(kv.Key), hlc.Timestamp{}, kv.Value, storage.MVCCWriteOptions{},
ctx, batch, cachedSettingsKey, hlc.Timestamp{}, kv.Value, storage.MVCCWriteOptions{},
); err != nil {
return err
}
Expand Down Expand Up @@ -151,6 +164,7 @@ func initializeCachedSettings(
" skipping settings updates.")
}
settingKey := settings.InternalKey(settingKeyS)
log.VEventf(ctx, 1, "loaded cached setting: %s -> %+v", settingKey, val)
if err := updater.Set(ctx, settingKey, val); err != nil {
log.Warningf(ctx, "setting %q to %v failed: %+v", settingKey, val, err)
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/server/settings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package server
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -146,3 +148,60 @@ Actual: %+v
return nil
})
}

func TestCachedSettingDeletionIsPersisted(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

hasKey := func(kvs []roachpb.KeyValue, key string) bool {
for _, kv := range kvs {
if strings.Contains(string(kv.Key), key) {
return true
}
}
return false
}

ctx := context.Background()

ts, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
})
defer ts.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(sqlDB)

// Make the test faster.
st := ts.ClusterSettings()
closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond)
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond)
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond)

// Customize a setting.
db.Exec(t, `SET CLUSTER SETTING ui.display_timezone = 'America/New_York'`)
// The setting won't propagate to the store until the setting watcher caches
// up with the rangefeed, which might take a while.
testutils.SucceedsSoon(t, func() error {
store, err := ts.GetStores().(*kvserver.Stores).GetStore(1)
require.NoError(t, err)
settings, err := loadCachedSettingsKVs(context.Background(), store.TODOEngine())
require.NoError(t, err)
if !hasKey(settings, `ui.display_timezone`) {
return errors.New("cached setting not found")
}
return nil
})

// Reset the setting.
db.Exec(t, `RESET CLUSTER SETTING ui.display_timezone`)
// Check that the setting is eventually deleted from the store.
testutils.SucceedsSoon(t, func() error {
store, err := ts.GetStores().(*kvserver.Stores).GetStore(1)
require.NoError(t, err)
settings, err := loadCachedSettingsKVs(context.Background(), store.TODOEngine())
require.NoError(t, err)
if hasKey(settings, `ui.display_timezone`) {
return errors.New("cached setting was still found")
}
return nil
})
}
48 changes: 25 additions & 23 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ type SettingsWatcher struct {
f *rangefeed.Factory
stopper *stop.Stopper
dec RowDecoder
storage Storage

// storage is used to persist a local cache of the setting
// overrides, for use when a node starts up before KV is ready.
storage Storage
// snapshot is what goes into the local cache.
snapshot []roachpb.KeyValue

overridesMonitor OverridesMonitor

Expand Down Expand Up @@ -194,24 +199,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
if s.storage != nil {
bufferSize = settings.MaxSettings * 3
}
var snapshot []roachpb.KeyValue // used with storage
maybeUpdateSnapshot := func(update rangefeedcache.Update) {
// Only record the update to the buffer if we're writing to storage.
if s.storage == nil ||
// and the update has some new information to write.
(update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) {
return
}
eventKVs := rangefeedbuffer.EventsToKVs(update.Events,
rangefeedbuffer.RangeFeedValueEventToKV)
switch update.Type {
case rangefeedcache.CompleteUpdate:
snapshot = eventKVs
case rangefeedcache.IncrementalUpdate:
snapshot = rangefeedbuffer.MergeKVs(snapshot, eventKVs)
}
s.storage.SnapshotKVs(ctx, snapshot)
}

c := rangefeedcache.NewWatcher(
"settings-watcher",
s.clock, s.f,
Expand All @@ -223,7 +211,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
},
func(ctx context.Context, update rangefeedcache.Update) {
noteUpdate(update)
maybeUpdateSnapshot(update)
},
s.testingWatcherKnobs,
)
Expand Down Expand Up @@ -270,11 +257,13 @@ func (s *SettingsWatcher) TestingRestart() {
func (s *SettingsWatcher) handleKV(
ctx context.Context, kv *kvpb.RangeFeedValue,
) rangefeedbuffer.Event {
var alloc tree.DatumAlloc
settingKeyS, val, tombstone, err := s.dec.DecodeRow(roachpb.KeyValue{
rkv := roachpb.KeyValue{
Key: kv.Key,
Value: kv.Value,
}, &alloc)
}

var alloc tree.DatumAlloc
settingKeyS, val, tombstone, err := s.dec.DecodeRow(rkv, &alloc)
if err != nil {
// This should never happen: the rangefeed should only ever deliver valid SQL rows.
err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode settings row %v", kv.Key)
Expand All @@ -295,6 +284,19 @@ func (s *SettingsWatcher) handleKV(
}
}

// Ensure that the update is persisted to the local cache before we
// propagate the value to the in-RAM store. This ensures the latest
// value will be reloaded from the cache if the service is
// interrupted abruptly after the new value is seen by a client.
//
// Note: it is because we really want the cache to be updated before
// the in-RAM store that we do this here instead of batching the
// updates in the onUpdate rangefeed function.
if s.storage != nil {
s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, []roachpb.KeyValue{rkv})
s.storage.SnapshotKVs(ctx, s.snapshot)
}

s.maybeSet(ctx, settingKey, settingsValue{
val: val,
ts: kv.Value.Timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,14 @@ type fakeStorage struct {
func (f *fakeStorage) SnapshotKVs(ctx context.Context, kvs []roachpb.KeyValue) {
f.Lock()
defer f.Unlock()
f.kvs = kvs
nonDeletions := make([]roachpb.KeyValue, 0, len(kvs))
for _, kv := range kvs {
if !kv.Value.IsPresent() {
continue
}
nonDeletions = append(nonDeletions, kv)
}
f.kvs = nonDeletions
f.numWrites++
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/server/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ type TestingKnobs struct {
// DialNodeCallback is used to mock dial errors when dialing a node. It is
// invoked by the dialNode method of server.serverIterator.
DialNodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error

// DisableSettingsWatcher disables the watcher that monitors updates
// to system.settings.
DisableSettingsWatcher bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/integration_tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
74 changes: 74 additions & 0 deletions pkg/settings/integration_tests/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

const strKey = "testing.str"
Expand Down Expand Up @@ -286,3 +288,75 @@ func TestSettingsShowAll(t *testing.T) {
t.Fatalf("show all did not find the test keys: %q", rows)
}
}

func TestSettingsPersistenceEndToEnd(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// We're going to restart the test server, but expecting storage to
// persist. Define a sticky VFS for this purpose.
stickyVFSRegistry := server.NewStickyVFSRegistry()
serverKnobs := &server.TestingKnobs{
StickyVFSRegistry: stickyVFSRegistry,
}
serverArgs := base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
StoreSpecs: []base.StoreSpec{
{InMemory: true, StickyVFSID: "1"},
},
Knobs: base.TestingKnobs{
Server: serverKnobs,
},
}

ts, sqlDB, _ := serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(sqlDB)

// We need a custom value for the cluster setting that's guaranteed
// to be different from the default. So check that it's not equal to
// the default always.
const differentValue = `something`

setting, _ := settings.LookupForLocalAccessByKey("cluster.organization", true)
s := setting.(*settings.StringSetting)
st := ts.ClusterSettings()
require.NotEqual(t, s.Get(&st.SV), differentValue)
origValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0]

// Customize the setting.
db.Exec(t, `SET CLUSTER SETTING cluster.organization = $1`, differentValue)
newValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0]

// Restart the server; verify the setting customization is preserved.
// For this we disable the settings watcher, to ensure that
// only the value loaded by the local persisted cache is used.
ts.Stopper().Stop(ctx)
serverKnobs.DisableSettingsWatcher = true
ts, sqlDB, _ = serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db = sqlutils.MakeSQLRunner(sqlDB)

db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{newValue}})

// Restart the server to make the setting writable again.
ts.Stopper().Stop(ctx)
serverKnobs.DisableSettingsWatcher = false
ts, sqlDB, _ = serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db = sqlutils.MakeSQLRunner(sqlDB)

// Reset the setting, then check the original value is restored.
db.Exec(t, `RESET CLUSTER SETTING cluster.organization`)
db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}})

// Restart the server; verify the original value is still there.
ts.Stopper().Stop(ctx)
ts, sqlDB, _ = serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db = sqlutils.MakeSQLRunner(sqlDB)

db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}})
}