Skip to content

Commit

Permalink
settingswatcher: write-through to the persisted cache
Browse files Browse the repository at this point in the history
Prior to this patch, the rangefeed watcher over `system.settings` was
updating the in-RAM value store before it propagated the updates to
the persisted local cache.

In fact, the update to the persisted local cache was lagging quite a
bit behind, because the rangefeed watcher would buffer updates and
only flush them after a while.

As a result, the following sequence was possible:

1. client updates a cluster setting.
2. server is immediately shut down. The persisted cache
   has not been updated yet.
3. server is restarted. For a short while (until the settings watcher
   has caught up), the old version of the setting remains active.

This recall of ghost values of a setting was simply a bug. This patch
fixes that, by ensuring that the persisted cache is written through
before the in-RAM value store.

By doing this, we give up on batching updates to the persisted local
store. This is deemed acceptable because cluster settings are not
updated frequently.

Release note: None
  • Loading branch information
knz committed Sep 29, 2023
1 parent 81ebdaf commit c5af624
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 24 deletions.
12 changes: 10 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,16 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
return err
}

if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
return errors.Wrap(err, "failed to initialize the tenant settings watcher")
startSettingsWatcher := true
if serverKnobs := s.cfg.TestingKnobs.Server; serverKnobs != nil {
if serverKnobs.(*TestingKnobs).DisableSettingsWatcher {
startSettingsWatcher = false
}
}
if startSettingsWatcher {
if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
return errors.Wrap(err, "failed to initialize the tenant settings watcher")
}
}
if err := s.tenantCapabilitiesWatcher.Start(ctx); err != nil {
return errors.Wrap(err, "initializing tenant capabilities")
Expand Down
40 changes: 18 additions & 22 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
},
func(ctx context.Context, update rangefeedcache.Update) {
noteUpdate(update)
s.maybeUpdateSnapshot(ctx, update)
},
s.testingWatcherKnobs,
)
Expand Down Expand Up @@ -243,24 +242,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
}
}

func (s *SettingsWatcher) maybeUpdateSnapshot(ctx context.Context, update rangefeedcache.Update) {
// Only record the update to the buffer if we're writing to storage.
if s.storage == nil ||
// and the update has some new information to write.
(update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) {
return
}
eventKVs := rangefeedbuffer.EventsToKVs(update.Events,
rangefeedbuffer.RangeFeedValueEventToKV)
switch update.Type {
case rangefeedcache.CompleteUpdate:
s.snapshot = eventKVs
case rangefeedcache.IncrementalUpdate:
s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, eventKVs)
}
s.storage.SnapshotKVs(ctx, s.snapshot)
}

// TestingRestart restarts the rangefeeds and waits for the initial
// update after the rangefeed update to be processed.
func (s *SettingsWatcher) TestingRestart() {
Expand All @@ -276,11 +257,13 @@ func (s *SettingsWatcher) TestingRestart() {
func (s *SettingsWatcher) handleKV(
ctx context.Context, kv *kvpb.RangeFeedValue,
) rangefeedbuffer.Event {
var alloc tree.DatumAlloc
settingKeyS, val, tombstone, err := s.dec.DecodeRow(roachpb.KeyValue{
rkv := roachpb.KeyValue{
Key: kv.Key,
Value: kv.Value,
}, &alloc)
}

var alloc tree.DatumAlloc
settingKeyS, val, tombstone, err := s.dec.DecodeRow(rkv, &alloc)
if err != nil {
// This should never happen: the rangefeed should only ever deliver valid SQL rows.
err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode settings row %v", kv.Key)
Expand All @@ -301,6 +284,19 @@ func (s *SettingsWatcher) handleKV(
}
}

// Ensure that the update is persisted to the local cache before we
// propagate the value to the in-RAM store. This ensures the latest
// value will be reloaded from the cache if the service is
// interrupted abruptly after the new value is seen by a client.
//
// Note: it is because we really want the cache to be updated before
// the in-RAM store that we do this here instead of batching the
// updates in the onUpdate rangefeed function.
if s.storage != nil {
s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, []roachpb.KeyValue{rkv})
s.storage.SnapshotKVs(ctx, s.snapshot)
}

s.maybeSet(ctx, settingKey, settingsValue{
val: val,
ts: kv.Value.Timestamp,
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ type TestingKnobs struct {
// DialNodeCallback is used to mock dial errors when dialing a node. It is
// invoked by the dialNode method of server.serverIterator.
DialNodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error

// DisableSettingsWatcher disables the watcher that monitors updates
// to system.settings.
DisableSettingsWatcher bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/integration_tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
74 changes: 74 additions & 0 deletions pkg/settings/integration_tests/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

const strKey = "testing.str"
Expand Down Expand Up @@ -286,3 +288,75 @@ func TestSettingsShowAll(t *testing.T) {
t.Fatalf("show all did not find the test keys: %q", rows)
}
}

func TestSettingsPersistenceEndToEnd(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// We're going to restart the test server, but expecting storage to
// persist. Define a sticky VFS for this purpose.
stickyVFSRegistry := server.NewStickyVFSRegistry()
serverKnobs := &server.TestingKnobs{
StickyVFSRegistry: stickyVFSRegistry,
}
serverArgs := base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
StoreSpecs: []base.StoreSpec{
{InMemory: true, StickyVFSID: "1"},
},
Knobs: base.TestingKnobs{
Server: serverKnobs,
},
}

ts, sqlDB, _ := serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(sqlDB)

// We need a custom value for the cluster setting that's guaranteed
// to be different from the default. So check that it's not equal to
// the default always.
const differentValue = `something`

setting, _ := settings.LookupForLocalAccessByKey("cluster.organization", true)
s := setting.(*settings.StringSetting)
st := ts.ClusterSettings()
require.NotEqual(t, s.Get(&st.SV), differentValue)
origValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0]

// Customize the setting.
db.Exec(t, `SET CLUSTER SETTING cluster.organization = $1`, differentValue)
newValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0]

// Restart the server; verify the setting customization is preserved.
// For this we disable the settings watcher, to ensure that
// only the value loaded by the local persisted cache is used.
ts.Stopper().Stop(ctx)
serverKnobs.DisableSettingsWatcher = true
ts, sqlDB, _ = serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db = sqlutils.MakeSQLRunner(sqlDB)

db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{newValue}})

// Restart the server to make the setting writable again.
ts.Stopper().Stop(ctx)
serverKnobs.DisableSettingsWatcher = false
ts, sqlDB, _ = serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db = sqlutils.MakeSQLRunner(sqlDB)

// Reset the setting, then check the original value is restored.
db.Exec(t, `RESET CLUSTER SETTING cluster.organization`)
db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}})

// Restart the server; verify the original value is still there.
ts.Stopper().Stop(ctx)
ts, sqlDB, _ = serverutils.StartServer(t, serverArgs)
defer ts.Stopper().Stop(ctx)
db = sqlutils.MakeSQLRunner(sqlDB)

db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}})
}

0 comments on commit c5af624

Please sign in to comment.