Skip to content

Commit

Permalink
Merge #111153
Browse files Browse the repository at this point in the history
111153: settings: simple refactors r=yuzefovich,stevendanna a=knz

See the last 7 commits for details.

Previous in sequence:
- [x] #110789 and #110947 
- [x] #110676
- [x] #111008
- [x] #111145
- [x] #111147
- [x]  #111149
- [x] #111150

Needed for #110758
Epic: CRDB-6671

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Sep 26, 2023
2 parents 75fecde + 17f42ed commit c704759
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 76 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ ALL_TESTS = [
"//pkg/server/tenantsettingswatcher:tenantsettingswatcher_test",
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
"//pkg/settings/integration_tests:integration_tests_test",
"//pkg/settings/lint:lint_test",
"//pkg/settings/rulebasedscanner:rulebasedscanner_test",
"//pkg/settings:settings_test",
Expand Down Expand Up @@ -1597,6 +1598,7 @@ GO_TARGETS = [
"//pkg/server:server",
"//pkg/server:server_test",
"//pkg/settings/cluster:cluster",
"//pkg/settings/integration_tests:integration_tests_test",
"//pkg/settings/lint:lint_test",
"//pkg/settings/rulebasedscanner:rulebasedscanner",
"//pkg/settings/rulebasedscanner:rulebasedscanner_test",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvtenant/setting_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ func (c *connector) processSettingsEvent(
for _, o := range e.Overrides {
if o.Value == (settings.EncodedValue{}) {
// Empty value indicates that the override is removed.
log.VEventf(ctx, 1, "removing %v override for %q", e.Precedence, o.InternalKey)
delete(m, o.InternalKey)
} else {
log.VEventf(ctx, 1, "adding %v override for %q = %q", e.Precedence, o.InternalKey, o.Value.Value)
m[o.InternalKey] = o.Value
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ go_test(
"server_systemlog_gc_test.go",
"server_test.go",
"settings_cache_test.go",
"settings_test.go",
"span_stats_server_test.go",
"span_stats_test.go",
"statements_test.go",
Expand Down
8 changes: 4 additions & 4 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2227,7 +2227,7 @@ func (n *Node) TenantSettings(

// Send the setting overrides for one precedence level.
const firstPrecedenceLevel = kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES
allOverrides, allCh := settingsWatcher.GetAllTenantOverrides()
allOverrides, allCh := settingsWatcher.GetAllTenantOverrides(ctx)

// Inject the current storage logical version as an override; as the
// tenant server needs this to start up.
Expand All @@ -2246,7 +2246,7 @@ func (n *Node) TenantSettings(
// Then send the initial setting overrides for the other precedence
// level. This is the payload that will let the tenant client
// connector signal readiness.
tenantOverrides, tenantCh := settingsWatcher.GetTenantOverrides(args.TenantID)
tenantOverrides, tenantCh := settingsWatcher.GetTenantOverrides(ctx, args.TenantID)
if err := sendSettings(kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES, tenantOverrides, false /* incremental */); err != nil {
return err
}
Expand Down Expand Up @@ -2274,7 +2274,7 @@ func (n *Node) TenantSettings(
// All-tenant overrides have changed, send them again.
// TODO(multitenant): We can optimize this by only sending the delta since the last
// update, with Incremental set to true.
allOverrides, allCh = settingsWatcher.GetAllTenantOverrides()
allOverrides, allCh = settingsWatcher.GetAllTenantOverrides(ctx)
if err := sendSettings(kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES, allOverrides, false /* incremental */); err != nil {
return err
}
Expand All @@ -2283,7 +2283,7 @@ func (n *Node) TenantSettings(
// Tenant-specific overrides have changed, send them again.
// TODO(multitenant): We can optimize this by only sending the delta since the last
// update, with Incremental set to true.
tenantOverrides, tenantCh = settingsWatcher.GetTenantOverrides(args.TenantID)
tenantOverrides, tenantCh = settingsWatcher.GetTenantOverrides(ctx, args.TenantID)
if err := sendSettings(kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES, tenantOverrides, false /* incremental */); err != nil {
return err
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,6 @@ type settingsValue struct {
tombstone bool
}

const versionSettingKey = "version"

// set the current value of a setting.
func (s *SettingsWatcher) setLocked(
ctx context.Context,
Expand All @@ -361,7 +359,7 @@ func (s *SettingsWatcher) setLocked(
// bootstrap the initial cluster version on tenant startup. In all other
// instances, this code should no-op (either because we're in the system
// tenant, or because the new version <= old version).
if key == versionSettingKey && !s.codec.ForSystemTenant() {
if key == clusterversion.KeyVersionSetting && !s.codec.ForSystemTenant() {
var newVersion clusterversion.ClusterVersion
oldVersion := s.settings.Version.ActiveVersionOrEmpty(ctx)
if err := protoutil.Unmarshal([]byte(val.Value), &newVersion); err != nil {
Expand Down Expand Up @@ -409,7 +407,7 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) (updateCh <-chan
defer s.mu.Unlock()

for key, val := range newOverrides {
if key == versionSettingKey {
if key == clusterversion.KeyVersionSetting {
var newVersion clusterversion.ClusterVersion
if err := protoutil.Unmarshal([]byte(val.Value), &newVersion); err != nil {
log.Warningf(ctx, "ignoring invalid cluster version: %s - %v\n"+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func TestStaleRowsDoNotCauseSettingsToRegress(t *testing.T) {
// The tenant prefix, if one exists, will have been stripped from the
// key.
getSettingKVForFakeSetting := func(t *testing.T) roachpb.KeyValue {
codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
codec := s.Codec()
k := codec.TablePrefix(keys.SettingsTableID)
rows, err := s.DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/tenantsettingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ go_library(
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand Down
46 changes: 34 additions & 12 deletions pkg/server/tenantsettingswatcher/overrides_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
package tenantsettingswatcher

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// overridesStore is the data structure that maintains all the tenant overrides
Expand Down Expand Up @@ -46,15 +51,26 @@ type overridesStore struct {
// tenantOverrides stores the current overrides for a tenant (or the current
// all-tenant overrides). It is an immutable data structure.
type tenantOverrides struct {
// overrides, ordered by Name.
// overrides, ordered by InternalKey.
overrides []kvpb.TenantSetting

// changeCh is a channel that is closed when the tenant overrides change (in
// which case a new tenantOverrides object will contain the updated settings).
changeCh chan struct{}
}

func newTenantOverrides(overrides []kvpb.TenantSetting) *tenantOverrides {
func newTenantOverrides(
ctx context.Context, tenID roachpb.TenantID, overrides []kvpb.TenantSetting,
) *tenantOverrides {
if log.V(1) {
var buf redact.StringBuilder
buf.Printf("loaded overrides for tenant %d (%s)\n", tenID.InternalValue, util.GetSmallTrace(2))
for _, v := range overrides {
buf.Printf("%v = %+v", v.InternalKey, v.Value)
buf.SafeRune('\n')
}
log.VEventf(ctx, 1, "%v", buf)
}
return &tenantOverrides{
overrides: overrides,
changeCh: make(chan struct{}),
Expand All @@ -65,15 +81,17 @@ func (s *overridesStore) Init() {
s.mu.tenants = make(map[roachpb.TenantID]*tenantOverrides)
}

// SetAll initializes the overrides for all tenants. Any existing overrides are
// setAll initializes the overrides for all tenants. Any existing overrides are
// replaced.
//
// The store takes ownership of the overrides slices in the map (the caller can
// no longer modify them).
//
// This method is called once we complete a full initial scan of the
// tenant_setting table.
func (s *overridesStore) SetAll(allOverrides map[roachpb.TenantID][]kvpb.TenantSetting) {
func (s *overridesStore) setAll(
ctx context.Context, allOverrides map[roachpb.TenantID][]kvpb.TenantSetting,
) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -91,18 +109,20 @@ func (s *overridesStore) SetAll(allOverrides map[roachpb.TenantID][]kvpb.TenantS
// Sanity check.
for i := 1; i < len(overrides); i++ {
if overrides[i].InternalKey == overrides[i-1].InternalKey {
panic("duplicate setting")
panic(errors.AssertionFailedf("duplicate setting: %s", overrides[i].InternalKey))
}
}
s.mu.tenants[tenantID] = newTenantOverrides(overrides)
s.mu.tenants[tenantID] = newTenantOverrides(ctx, tenantID, overrides)
}
}

// GetTenantOverrides retrieves the overrides for a given tenant.
// getTenantOverrides retrieves the overrides for a given tenant.
//
// The caller can listen for closing of changeCh, which is guaranteed to happen
// if the tenant's overrides change.
func (s *overridesStore) GetTenantOverrides(tenantID roachpb.TenantID) *tenantOverrides {
func (s *overridesStore) getTenantOverrides(
ctx context.Context, tenantID roachpb.TenantID,
) *tenantOverrides {
s.mu.RLock()
res, ok := s.mu.tenants[tenantID]
s.mu.RUnlock()
Expand All @@ -120,15 +140,17 @@ func (s *overridesStore) GetTenantOverrides(tenantID roachpb.TenantID) *tenantOv
if res, ok = s.mu.tenants[tenantID]; ok {
return res
}
res = newTenantOverrides(nil /* overrides */)
res = newTenantOverrides(ctx, tenantID, nil /* overrides */)
s.mu.tenants[tenantID] = res
return res
}

// SetTenantOverride changes an override for the given tenant. If the setting
// setTenantOverride changes an override for the given tenant. If the setting
// has an empty value, the existing override is removed; otherwise a new
// override is added.
func (s *overridesStore) SetTenantOverride(tenantID roachpb.TenantID, setting kvpb.TenantSetting) {
func (s *overridesStore) setTenantOverride(
ctx context.Context, tenantID roachpb.TenantID, setting kvpb.TenantSetting,
) {
s.mu.Lock()
defer s.mu.Unlock()
var before []kvpb.TenantSetting
Expand All @@ -152,5 +174,5 @@ func (s *overridesStore) SetTenantOverride(tenantID roachpb.TenantID, setting kv
}
// 3. Append all settings after setting.InternalKey.
after = append(after, before...)
s.mu.tenants[tenantID] = newTenantOverrides(after)
s.mu.tenants[tenantID] = newTenantOverrides(ctx, tenantID, after)
}
26 changes: 14 additions & 12 deletions pkg/server/tenantsettingswatcher/overrides_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package tenantsettingswatcher

import (
"context"
"fmt"
"strings"
"testing"
Expand All @@ -25,6 +26,7 @@ import (
func TestOverridesStore(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
var s overridesStore
s.Init()
t1 := roachpb.MustMakeTenantID(1)
Expand Down Expand Up @@ -55,36 +57,36 @@ func TestOverridesStore(t *testing.T) {
t.Fatalf("channel did not close")
}
}
o1 := s.GetTenantOverrides(t1)
o1 := s.getTenantOverrides(ctx, t1)
expect(o1, "")
s.SetAll(map[roachpb.TenantID][]kvpb.TenantSetting{
s.setAll(ctx, map[roachpb.TenantID][]kvpb.TenantSetting{
t1: {st("a", "aa"), st("b", "bb"), st("d", "dd")},
t2: {st("x", "xx")},
})
expectChange(o1)
o1 = s.GetTenantOverrides(t1)
o1 = s.getTenantOverrides(ctx, t1)
expect(o1, "a=aa b=bb d=dd")
o2 := s.GetTenantOverrides(t2)
o2 := s.getTenantOverrides(ctx, t2)
expect(o2, "x=xx")

s.SetTenantOverride(t1, st("b", "changed"))
s.setTenantOverride(ctx, t1, st("b", "changed"))
expectChange(o1)
o1 = s.GetTenantOverrides(t1)
o1 = s.getTenantOverrides(ctx, t1)
expect(o1, "a=aa b=changed d=dd")

s.SetTenantOverride(t1, st("b", ""))
s.setTenantOverride(ctx, t1, st("b", ""))
expectChange(o1)
o1 = s.GetTenantOverrides(t1)
o1 = s.getTenantOverrides(ctx, t1)
expect(o1, "a=aa d=dd")

s.SetTenantOverride(t1, st("c", "cc"))
s.setTenantOverride(ctx, t1, st("c", "cc"))
expectChange(o1)
o1 = s.GetTenantOverrides(t1)
o1 = s.getTenantOverrides(ctx, t1)
expect(o1, "a=aa c=cc d=dd")

// Set an override for a tenant that has no existing data.
t3 := roachpb.MustMakeTenantID(3)
s.SetTenantOverride(t3, st("x", "xx"))
o3 := s.GetTenantOverrides(t3)
s.setTenantOverride(ctx, t3, st("x", "xx"))
o3 := s.getTenantOverrides(ctx, t3)
expect(o3, "x=xx")
}
25 changes: 12 additions & 13 deletions pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ import (
// if err := w.Start(ctx); err != nil { ... }
//
// // Get overrides and keep them up to date.
// all, allCh := w.AllTenantOverrides()
// tenant, tenantCh := w.TenantOverrides(tenantID)
// all, allCh := w.GetAllTenantOverrides()
// tenant, tenantCh := w.GetTenantOverrides(ctx,tenantID)
// select {
// case <-allCh:
// all, allCh = w.AllTenantOverrides()
// all, allCh = w.GetAllTenantOverrides()
// case <-tenantCh:
// tenant, tenantCh = w.TenantOverrides(tenantID)
// tenant, tenantCh = w.GetTenantOverrides(ctx,tenantID)
// case <-ctx.Done():
// ...
// }
Expand Down Expand Up @@ -152,7 +152,7 @@ func (w *Watcher) startRangeFeed(
allOverrides[tenantID] = append(allOverrides[tenantID], setting)
} else {
// We are processing incremental changes.
w.store.SetTenantOverride(tenantID, setting)
w.store.setTenantOverride(ctx, tenantID, setting)
}
return nil
}
Expand All @@ -162,7 +162,7 @@ func (w *Watcher) startRangeFeed(
// The CompleteUpdate indicates that the table scan is complete.
// Henceforth, all calls to translateEvent will be incremental changes,
// until we hit an error and have to restart the rangefeed.
w.store.SetAll(allOverrides)
w.store.setAll(ctx, allOverrides)
allOverrides = nil

if !initialScan.done {
Expand Down Expand Up @@ -249,9 +249,9 @@ func (w *Watcher) TestingRestart() {
//
// The caller must not modify the returned overrides slice.
func (w *Watcher) GetTenantOverrides(
tenantID roachpb.TenantID,
ctx context.Context, tenantID roachpb.TenantID,
) (overrides []kvpb.TenantSetting, changeCh <-chan struct{}) {
o := w.store.GetTenantOverrides(tenantID)
o := w.store.getTenantOverrides(ctx, tenantID)
return o.overrides, o.changeCh
}

Expand All @@ -260,9 +260,8 @@ func (w *Watcher) GetTenantOverrides(
// have an override for the same setting.
//
// The caller must not modify the returned overrides slice.
func (w *Watcher) GetAllTenantOverrides() (
overrides []kvpb.TenantSetting,
changeCh <-chan struct{},
) {
return w.GetTenantOverrides(allTenantOverridesID)
func (w *Watcher) GetAllTenantOverrides(
ctx context.Context,
) (overrides []kvpb.TenantSetting, changeCh <-chan struct{}) {
return w.GetTenantOverrides(ctx, allTenantOverridesID)
}
Loading

0 comments on commit c704759

Please sign in to comment.