From dc5548e87b174b0dc5dbbe58b95ad1eff29cf362 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 8 Feb 2022 10:28:32 -0800 Subject: [PATCH 1/2] settings: add EncodedValue proto, update tenant settings API This commit consolidates multiple uses of encoded setting values (raw value and type strings) into a `settings.EncodedValue` proto. The tenant settings roachpb API (not used yet) is updated to use this. Release note: None --- docs/generated/http/BUILD.bazel | 1 + pkg/ccl/utilccl/BUILD.bazel | 1 + pkg/ccl/utilccl/license_check_test.go | 16 +++-- pkg/cloud/httpsink/BUILD.bazel | 1 + pkg/cloud/httpsink/http_storage_test.go | 11 ++- pkg/gen/protobuf.bzl | 1 + pkg/roachpb/BUILD.bazel | 2 + pkg/roachpb/api.proto | 7 +- pkg/server/settings_cache.go | 6 +- pkg/server/settingswatcher/BUILD.bazel | 4 -- pkg/server/settingswatcher/overrides.go | 8 ++- pkg/server/settingswatcher/row_decoder.go | 34 ++------- .../settingswatcher/settings_watcher.go | 14 ++-- .../settings_watcher_external_test.go | 10 +-- pkg/settings/BUILD.bazel | 32 ++++++++- pkg/settings/encoding.go | 28 ++++++++ pkg/settings/encoding.proto | 26 +++++++ .../encoding_test.go} | 8 +-- pkg/settings/settings_test.go | 71 ++++++++++--------- pkg/settings/updater.go | 30 ++++---- pkg/sql/set_cluster_setting.go | 6 +- 21 files changed, 203 insertions(+), 114 deletions(-) create mode 100644 pkg/settings/encoding.go create mode 100644 pkg/settings/encoding.proto rename pkg/{server/settingswatcher/row_decoder_test.go => settings/encoding_test.go} (89%) diff --git a/docs/generated/http/BUILD.bazel b/docs/generated/http/BUILD.bazel index 266c86d6a6f9..e59d63f26c1b 100644 --- a/docs/generated/http/BUILD.bazel +++ b/docs/generated/http/BUILD.bazel @@ -27,6 +27,7 @@ genrule( "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_proto", "//pkg/server/serverpb:serverpb_proto", "//pkg/server/status/statuspb:statuspb_proto", + "//pkg/settings:settings_proto", "//pkg/sql/catalog/catpb:catpb_proto", "//pkg/sql/catalog/descpb:descpb_proto", "//pkg/sql/schemachanger/scpb:scpb_proto", diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index db5b8d446d8b..c507000cf248 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -45,6 +45,7 @@ go_test( deps = [ "//pkg/base", "//pkg/ccl/utilccl/licenseccl", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/testutils", "//pkg/util/envutil", diff --git a/pkg/ccl/utilccl/license_check_test.go b/pkg/ccl/utilccl/license_check_test.go index 69c5ea547aa7..734a50b6239a 100644 --- a/pkg/ccl/utilccl/license_check_test.go +++ b/pkg/ccl/utilccl/license_check_test.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/licenseccl" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -64,7 +65,7 @@ func TestSettingAndCheckingLicense(t *testing.T) { {"", idA, t0, "requires an enterprise license"}, } { updater := st.MakeUpdater() - if err := updater.Set(ctx, "enterprise.license", tc.lic, "s"); err != nil { + if err := setLicense(ctx, updater, tc.lic); err != nil { t.Fatal(err) } err := checkEnterpriseEnabledAt(st, tc.checkTime, tc.checkCluster, "", "", true) @@ -92,7 +93,7 @@ func TestGetLicenseTypePresent(t *testing.T) { Type: tc.licenseType, ValidUntilUnixSec: 0, }).Encode() - if err := updater.Set(ctx, "enterprise.license", lic, "s"); err != nil { + if err := setLicense(ctx, updater, lic); err != nil { t.Fatal(err) } actual, err := getLicenseType(st) @@ -125,7 +126,7 @@ func TestSettingBadLicenseStrings(t *testing.T) { st := cluster.MakeTestingClusterSettings() u := st.MakeUpdater() - if err := u.Set(ctx, "enterprise.license", tc.lic, "s"); !testutils.IsError( + if err := setLicense(ctx, u, tc.lic); !testutils.IsError( err, tc.err, ) { t.Fatalf("%q: expected err %q, got %v", tc.lic, tc.err, err) @@ -184,7 +185,7 @@ func TestTimeToEnterpriseLicenseExpiry(t *testing.T) { {"No License", "", 0}, } { t.Run(tc.desc, func(t *testing.T) { - if err := updater.Set(ctx, "enterprise.license", tc.lic, "s"); err != nil { + if err := setLicense(ctx, updater, tc.lic); err != nil { t.Fatal(err) } actual := base.LicenseTTL.Value() @@ -229,3 +230,10 @@ func TestApplyTenantLicenseWithInvalidLicense(t *testing.T) { defer envutil.TestSetEnv(t, "COCKROACH_TENANT_LICENSE", "THIS IS NOT A VALID LICENSE")() require.Error(t, ApplyTenantLicense()) } + +func setLicense(ctx context.Context, updater settings.Updater, val string) error { + return updater.Set(ctx, "enterprise.license", settings.EncodedValue{ + Value: val, + Type: "s", + }) +} diff --git a/pkg/cloud/httpsink/BUILD.bazel b/pkg/cloud/httpsink/BUILD.bazel index df2ee7cc8d27..3230c4c014aa 100644 --- a/pkg/cloud/httpsink/BUILD.bazel +++ b/pkg/cloud/httpsink/BUILD.bazel @@ -29,6 +29,7 @@ go_test( "//pkg/cloud/cloudtestutils", "//pkg/roachpb", "//pkg/security", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/testutils", "//pkg/util/ctxgroup", diff --git a/pkg/cloud/httpsink/http_storage_test.go b/pkg/cloud/httpsink/http_storage_test.go index 83cafc06d48b..40d3b86faee6 100644 --- a/pkg/cloud/httpsink/http_storage_test.go +++ b/pkg/cloud/httpsink/http_storage_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudtestutils" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -90,13 +91,19 @@ func TestPutHttp(t *testing.T) { })) u := testSettings.MakeUpdater() - if err := u.Set(ctx, "cloudstorage.http.custom_ca", string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw})), "s"); err != nil { + if err := u.Set(ctx, "cloudstorage.http.custom_ca", settings.EncodedValue{ + Value: string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw})), + Type: "s", + }); err != nil { t.Fatal(err) } cleanup := func() { srv.Close() - if err := u.Set(ctx, "cloudstorage.http.custom_ca", "", "s"); err != nil { + if err := u.Set(ctx, "cloudstorage.http.custom_ca", settings.EncodedValue{ + Value: "", + Type: "s", + }); err != nil { t.Fatal(err) } } diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index f82ead8e7e48..df87c11dff58 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -32,6 +32,7 @@ PROTOBUF_SRCS = [ "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_go_proto", "//pkg/server/serverpb:serverpb_go_proto", "//pkg/server/status/statuspb:statuspb_go_proto", + "//pkg/settings:settings_go_proto", "//pkg/sql/catalog/catpb:catpb_go_proto", "//pkg/sql/catalog/descpb:descpb_go_proto", "//pkg/sql/contentionpb:contentionpb_go_proto", diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index ae7d650ed1db..0c276e1a4035 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -215,6 +215,7 @@ proto_library( deps = [ "//pkg/kv/kvserver/concurrency/lock:lock_proto", "//pkg/kv/kvserver/readsummary/rspb:rspb_proto", + "//pkg/settings:settings_proto", "//pkg/storage/enginepb:enginepb_proto", "//pkg/util:util_proto", "//pkg/util/hlc:hlc_proto", @@ -236,6 +237,7 @@ go_proto_library( deps = [ "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/readsummary/rspb", + "//pkg/settings", "//pkg/storage/enginepb", "//pkg/util", "//pkg/util/hlc", diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index e258d9a0076b..b19f1df76e20 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -19,6 +19,7 @@ import "roachpb/data.proto"; import "roachpb/errors.proto"; import "roachpb/metadata.proto"; import "roachpb/span_config.proto"; +import "settings/encoding.proto"; import "storage/enginepb/mvcc.proto"; import "storage/enginepb/mvcc3.proto"; import "util/hlc/timestamp.proto"; @@ -2737,13 +2738,9 @@ message TenantSettingsEvent { } // TenantSetting contains the name and value of a tenant setting. -// -// The value representation is the same as that used by the system.settings -// table (value and valueType columns). message TenantSetting { string name = 1; - string raw_value = 2; - string value_type = 3; + settings.EncodedValue value = 2 [(gogoproto.nullable) = false]; } // TenantConsumption contains information about resource utilization by a diff --git a/pkg/server/settings_cache.go b/pkg/server/settings_cache.go index 78e41a7e03ec..7a0196572838 100644 --- a/pkg/server/settings_cache.go +++ b/pkg/server/settings_cache.go @@ -141,14 +141,14 @@ func initializeCachedSettings( ) error { dec := settingswatcher.MakeRowDecoder(codec) for _, kv := range kvs { - settings, rv, _, err := dec.DecodeRow(kv) + settings, val, _, err := dec.DecodeRow(kv) if err != nil { return errors.Wrap(err, `while decoding settings data -this likely indicates the settings table structure or encoding has been altered; -skipping settings updates`) } - if err := updater.Set(ctx, settings, rv.Value, rv.Type); err != nil { - log.Warningf(ctx, "setting %q to %v failed: %+v", settings, rv, err) + if err := updater.Set(ctx, settings, val); err != nil { + log.Warningf(ctx, "setting %q to %v failed: %+v", settings, val, err) } } updater.ResetRemaining(ctx) diff --git a/pkg/server/settingswatcher/BUILD.bazel b/pkg/server/settingswatcher/BUILD.bazel index 96376b230966..909873ad9436 100644 --- a/pkg/server/settingswatcher/BUILD.bazel +++ b/pkg/server/settingswatcher/BUILD.bazel @@ -30,8 +30,6 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_redact//:redact", - "@com_github_cockroachdb_redact//interfaces", ], ) @@ -40,7 +38,6 @@ go_test( srcs = [ "main_test.go", "row_decoder_external_test.go", - "row_decoder_test.go", "settings_watcher_external_test.go", "settings_watcher_test.go", ], @@ -66,7 +63,6 @@ go_test( "//pkg/util/leaktest", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/server/settingswatcher/overrides.go b/pkg/server/settingswatcher/overrides.go index 8986ba34b94a..88b22c24ea48 100644 --- a/pkg/server/settingswatcher/overrides.go +++ b/pkg/server/settingswatcher/overrides.go @@ -10,6 +10,8 @@ package settingswatcher +import "github.com/cockroachdb/cockroach/pkg/settings" + // OverridesMonitor is an interface through which the settings watcher can // receive setting overrides. Used for non-system tenants. // @@ -22,7 +24,7 @@ type OverridesMonitor interface { NotifyCh() <-chan struct{} // Overrides retrieves the current set of setting overrides, as a map from - // setting key to RawValue. Any settings that are present must be set to the - // overridden value. - Overrides() map[string]RawValue + // setting key to EncodedValue. Any settings that are present must be set to + // the overridden value. + Overrides() map[string]settings.EncodedValue } diff --git a/pkg/server/settingswatcher/row_decoder.go b/pkg/server/settingswatcher/row_decoder.go index 38f03f259957..c2169e11a113 100644 --- a/pkg/server/settingswatcher/row_decoder.go +++ b/pkg/server/settingswatcher/row_decoder.go @@ -13,6 +13,7 @@ package settingswatcher import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -20,8 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" - "github.com/cockroachdb/redact" - "github.com/cockroachdb/redact/interfaces" ) // RowDecoder decodes rows from the settings table. @@ -32,25 +31,6 @@ type RowDecoder struct { decoder valueside.Decoder } -// RawValue contains a raw-value / value-type pair, corresponding to the value -// and valueType columns of the settings table. -type RawValue struct { - Value string - Type string -} - -// String is part of fmt.Stringer. -func (r RawValue) String() string { - return redact.Sprint(r).StripMarkers() -} - -// SafeFormat is part of redact.SafeFormatter. -func (r RawValue) SafeFormat(s interfaces.SafePrinter, verb rune) { - s.Printf("%q (%s)", r.Value, redact.SafeString(r.Type)) -} - -var _ redact.SafeFormatter = (*RawValue)(nil) - // MakeRowDecoder makes a new RowDecoder for the settings table. func MakeRowDecoder(codec keys.SQLCodec) RowDecoder { columns := systemschema.SettingsTable.PublicColumns() @@ -66,33 +46,33 @@ func MakeRowDecoder(codec keys.SQLCodec) RowDecoder { // tombstone bool will be set. func (d *RowDecoder) DecodeRow( kv roachpb.KeyValue, -) (setting string, val RawValue, tombstone bool, _ error) { +) (setting string, val settings.EncodedValue, tombstone bool, _ error) { // First we need to decode the setting name field from the index key. { types := []*types.T{d.columns[0].GetType()} nameRow := make([]rowenc.EncDatum, 1) _, _, err := rowenc.DecodeIndexKey(d.codec, types, nameRow, nil, kv.Key) if err != nil { - return "", RawValue{}, false, errors.Wrap(err, "failed to decode key") + return "", settings.EncodedValue{}, false, errors.Wrap(err, "failed to decode key") } if err := nameRow[0].EnsureDecoded(types[0], &d.alloc); err != nil { - return "", RawValue{}, false, err + return "", settings.EncodedValue{}, false, err } setting = string(tree.MustBeDString(nameRow[0].Datum)) } if !kv.Value.IsPresent() { - return setting, RawValue{}, true, nil + return setting, settings.EncodedValue{}, true, nil } // The rest of the columns are stored as a family. bytes, err := kv.Value.GetTuple() if err != nil { - return "", RawValue{}, false, err + return "", settings.EncodedValue{}, false, err } datums, err := d.decoder.Decode(&d.alloc, bytes) if err != nil { - return "", RawValue{}, false, err + return "", settings.EncodedValue{}, false, err } if value := datums[1]; value != tree.DNull { diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index ce0c10045f0e..786937fb7a04 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -48,8 +48,8 @@ type SettingsWatcher struct { syncutil.Mutex updater settings.Updater - values map[string]RawValue - overrides map[string]RawValue + values map[string]settings.EncodedValue + overrides map[string]settings.EncodedValue } // testingWatcherKnobs allows the client to inject testing knobs into @@ -128,10 +128,10 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } - s.mu.values = make(map[string]RawValue) + s.mu.values = make(map[string]settings.EncodedValue) if s.overridesMonitor != nil { - s.mu.overrides = make(map[string]RawValue) + s.mu.overrides = make(map[string]settings.EncodedValue) // Initialize the overrides. We want to do this before processing the // settings table, otherwise we could see temporary transitions to the value // in the table. @@ -272,7 +272,7 @@ func (s *SettingsWatcher) handleKV( const versionSettingKey = "version" // set the current value of a setting. -func (s *SettingsWatcher) setLocked(ctx context.Context, key string, val RawValue) { +func (s *SettingsWatcher) setLocked(ctx context.Context, key string, val settings.EncodedValue) { // The system tenant (i.e. the KV layer) does not use the SettingsWatcher // to propagate cluster version changes (it uses the BumpClusterVersion // RPC). However, non-system tenants (i.e. SQL pods) (asynchronously) get @@ -289,7 +289,7 @@ func (s *SettingsWatcher) setLocked(ctx context.Context, key string, val RawValu return } - if err := s.mu.updater.Set(ctx, key, val.Value, val.Type); err != nil { + if err := s.mu.updater.Set(ctx, key, val); err != nil { log.Warningf(ctx, "failed to set setting %s to %s: %v", log.Safe(key), val.Value, err) } } @@ -305,7 +305,7 @@ func (s *SettingsWatcher) setDefaultLocked(ctx context.Context, key string) { if !ok { log.Fatalf(ctx, "expected non-masked setting, got %T", s) } - val := RawValue{ + val := settings.EncodedValue{ Value: ws.EncodedDefault(), Type: ws.Typ(), } diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index fd567ad794a7..a19cfc3b953f 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -305,7 +305,7 @@ type testingOverrideMonitor struct { mu struct { syncutil.Mutex - overrides map[string]settingswatcher.RawValue + overrides map[string]settings.EncodedValue } } @@ -315,7 +315,7 @@ func newTestingOverrideMonitor() *testingOverrideMonitor { m := &testingOverrideMonitor{ ch: make(chan struct{}, 1), } - m.mu.overrides = make(map[string]settingswatcher.RawValue) + m.mu.overrides = make(map[string]settings.EncodedValue) return m } @@ -330,7 +330,7 @@ func (m *testingOverrideMonitor) set(key string, val string, valType string) { m.mu.Lock() defer m.mu.Unlock() - m.mu.overrides[key] = settingswatcher.RawValue{ + m.mu.overrides[key] = settings.EncodedValue{ Value: val, Type: valType, } @@ -348,10 +348,10 @@ func (m *testingOverrideMonitor) NotifyCh() <-chan struct{} { } // Overrides is part of the settingswatcher.OverridesMonitor interface. -func (m *testingOverrideMonitor) Overrides() map[string]settingswatcher.RawValue { +func (m *testingOverrideMonitor) Overrides() map[string]settings.EncodedValue { m.mu.Lock() defer m.mu.Unlock() - res := make(map[string]settingswatcher.RawValue) + res := make(map[string]settings.EncodedValue) for k, v := range m.mu.overrides { res[k] = v } diff --git a/pkg/settings/BUILD.bazel b/pkg/settings/BUILD.bazel index 7f52aef43c52..e24609646faf 100644 --- a/pkg/settings/BUILD.bazel +++ b/pkg/settings/BUILD.bazel @@ -1,3 +1,5 @@ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( @@ -8,6 +10,7 @@ go_library( "common.go", "doc.go", "duration.go", + "encoding.go", "enum.go", "float.go", "int.go", @@ -19,6 +22,7 @@ go_library( "values.go", "version.go", ], + embed = [":settings_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/settings", visibility = ["//visibility:public"], deps = [ @@ -26,19 +30,43 @@ go_library( "//pkg/util/humanizeutil", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", + "@com_github_cockroachdb_redact//interfaces", ], ) go_test( name = "settings_test", size = "small", - srcs = ["settings_test.go"], + srcs = [ + "encoding_test.go", + "settings_test.go", + ], + embed = [":settings"], deps = [ - ":settings", "//pkg/testutils", "//pkg/testutils/skip", + "//pkg/util/leaktest", "//pkg/util/protoutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//require", ], ) + +proto_library( + name = "settings_proto", + srcs = ["encoding.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"], +) + +go_proto_library( + name = "settings_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/settings", + proto = ":settings_proto", + visibility = ["//visibility:public"], + deps = ["@com_github_gogo_protobuf//gogoproto"], +) diff --git a/pkg/settings/encoding.go b/pkg/settings/encoding.go new file mode 100644 index 000000000000..0bf26c96e5fc --- /dev/null +++ b/pkg/settings/encoding.go @@ -0,0 +1,28 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package settings + +import ( + "github.com/cockroachdb/redact" + "github.com/cockroachdb/redact/interfaces" +) + +// String is part of fmt.Stringer. +func (v EncodedValue) String() string { + return redact.Sprint(v).StripMarkers() +} + +// SafeFormat is part of redact.SafeFormatter. +func (v EncodedValue) SafeFormat(s interfaces.SafePrinter, verb rune) { + s.Printf("%q (%s)", v.Value, redact.SafeString(v.Type)) +} + +var _ redact.SafeFormatter = EncodedValue{} diff --git a/pkg/settings/encoding.proto b/pkg/settings/encoding.proto new file mode 100644 index 000000000000..f68a221776ba --- /dev/null +++ b/pkg/settings/encoding.proto @@ -0,0 +1,26 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.settings; +option go_package = "settings"; + +import "gogoproto/gogo.proto"; + +// EncodedValue contains the value of a cluster setting serialized as an opaque +// string, along with a type identifier. Used when storing setting values on +// disk or passing them over the wire. +message EncodedValue { + option (gogoproto.equal) = true; + option (gogoproto.goproto_stringer) = false; + + string value = 1; + string type = 2; +} diff --git a/pkg/server/settingswatcher/row_decoder_test.go b/pkg/settings/encoding_test.go similarity index 89% rename from pkg/server/settingswatcher/row_decoder_test.go rename to pkg/settings/encoding_test.go index 048ac5c4d191..fc794850b903 100644 --- a/pkg/server/settingswatcher/row_decoder_test.go +++ b/pkg/settings/encoding_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package settingswatcher +package settings import ( "testing" @@ -18,16 +18,16 @@ import ( "github.com/stretchr/testify/require" ) -func TestRawValue_SafeFormat(t *testing.T) { +func TestEncodedValueSafeFormat(t *testing.T) { defer leaktest.AfterTest(t)() for _, tc := range []struct { - rv RawValue + rv EncodedValue redacted string regular string }{ { - rv: RawValue{ + rv: EncodedValue{ Value: "asdf", Type: "b", }, diff --git a/pkg/settings/settings_test.go b/pkg/settings/settings_test.go index 9e0af4ad256c..71893a55797e 100644 --- a/pkg/settings/settings_test.go +++ b/pkg/settings/settings_test.go @@ -203,16 +203,16 @@ func TestValidation(t *testing.T) { u := settings.NewUpdater(sv) t.Run("d_with_maximum", func(t *testing.T) { - err := u.Set(ctx, "d_with_maximum", "1h", "d") + err := u.Set(ctx, "d_with_maximum", v("1h", "d")) require.NoError(t, err) - err = u.Set(ctx, "d_with_maximum", "0h", "d") + err = u.Set(ctx, "d_with_maximum", v("0h", "d")) require.NoError(t, err) - err = u.Set(ctx, "d_with_maximum", "30m", "d") + err = u.Set(ctx, "d_with_maximum", v("30m", "d")) require.NoError(t, err) - err = u.Set(ctx, "d_with_maximum", "-1m", "d") + err = u.Set(ctx, "d_with_maximum", v("-1m", "d")) require.Error(t, err) - err = u.Set(ctx, "d_with_maximum", "1h1s", "d") + err = u.Set(ctx, "d_with_maximum", v("1h1s", "d")) require.Error(t, err) }) } @@ -383,46 +383,46 @@ func TestCache(t *testing.T) { if expected, actual := 0, changes.boolTA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "bool.t", settings.EncodeBool(false), "b"); err != nil { + if err := u.Set(ctx, "bool.t", v(settings.EncodeBool(false), "b")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.boolTA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "bool.f", settings.EncodeBool(true), "b"); err != nil { + if err := u.Set(ctx, "bool.f", v(settings.EncodeBool(true), "b")); err != nil { t.Fatal(err) } if expected, actual := 0, changes.strFooA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "str.foo", "baz", "s"); err != nil { + if err := u.Set(ctx, "str.foo", v("baz", "s")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.strFooA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "str.val", "valid", "s"); err != nil { + if err := u.Set(ctx, "str.val", v("valid", "s")); err != nil { t.Fatal(err) } - if err := u.Set(ctx, "i.2", settings.EncodeInt(3), "i"); err != nil { + if err := u.Set(ctx, "i.2", v(settings.EncodeInt(3), "i")); err != nil { t.Fatal(err) } if expected, actual := 0, changes.fA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "f", settings.EncodeFloat(3.1), "f"); err != nil { + if err := u.Set(ctx, "f", v(settings.EncodeFloat(3.1), "f")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.fA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "fVal", settings.EncodeFloat(3.1), "f"); err != nil { + if err := u.Set(ctx, "fVal", v(settings.EncodeFloat(3.1), "f")); err != nil { t.Fatal(err) } if expected, actual := 0, changes.dA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "d", settings.EncodeDuration(2*time.Hour), "d"); err != nil { + if err := u.Set(ctx, "d", v(settings.EncodeDuration(2*time.Hour), "d")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.dA; expected != actual { @@ -431,38 +431,38 @@ func TestCache(t *testing.T) { if expected, actual := 0, changes.duA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "d_with_explicit_unit", settings.EncodeDuration(2*time.Hour), "d"); err != nil { + if err := u.Set(ctx, "d_with_explicit_unit", v(settings.EncodeDuration(2*time.Hour), "d")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.duA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "dVal", settings.EncodeDuration(2*time.Hour), "d"); err != nil { + if err := u.Set(ctx, "dVal", v(settings.EncodeDuration(2*time.Hour), "d")); err != nil { t.Fatal(err) } if expected, actual := 0, changes.byteSize; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "zzz", settings.EncodeInt(mb*5), "z"); err != nil { + if err := u.Set(ctx, "zzz", v(settings.EncodeInt(mb*5), "z")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.byteSize; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "byteSize.Val", settings.EncodeInt(mb*5), "z"); err != nil { + if err := u.Set(ctx, "byteSize.Val", v(settings.EncodeInt(mb*5), "z")); err != nil { t.Fatal(err) } if expected, actual := 0, changes.eA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "e", settings.EncodeInt(2), "e"); err != nil { + if err := u.Set(ctx, "e", v(settings.EncodeInt(2), "e")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.eA; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } if expected, err := "strconv.Atoi: parsing \"notAValidValue\": invalid syntax", - u.Set(ctx, "e", "notAValidValue", "e"); !testutils.IsError(err, expected) { + u.Set(ctx, "e", v("notAValidValue", "e")); !testutils.IsError(err, expected) { t.Fatalf("expected '%s' != actual error '%s'", expected, err) } defaultDummyV := dummyVersion{msg1: "default", growsbyone: "AB"} @@ -523,22 +523,22 @@ func TestCache(t *testing.T) { t.Run("any setting not included in an Updater reverts to default", func(t *testing.T) { { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "bool.f", settings.EncodeBool(true), "b"); err != nil { + if err := u.Set(ctx, "bool.f", v(settings.EncodeBool(true), "b")); err != nil { t.Fatal(err) } if expected, actual := 0, changes.i1A; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "i.1", settings.EncodeInt(1), "i"); err != nil { + if err := u.Set(ctx, "i.1", v(settings.EncodeInt(1), "i")); err != nil { t.Fatal(err) } if expected, actual := 1, changes.i1A; expected != actual { t.Fatalf("expected %d, got %d", expected, actual) } - if err := u.Set(ctx, "i.2", settings.EncodeInt(7), "i"); err != nil { + if err := u.Set(ctx, "i.2", v(settings.EncodeInt(7), "i")); err != nil { t.Fatal(err) } - if err := u.Set(ctx, "i.Val", settings.EncodeInt(1), "i"); err != nil { + if err := u.Set(ctx, "i.Val", v(settings.EncodeInt(1), "i")); err != nil { t.Fatal(err) } u.ResetRemaining(ctx) @@ -571,7 +571,7 @@ func TestCache(t *testing.T) { t.Run("an invalid update to a given setting preserves its previously set value", func(t *testing.T) { { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "i.2", settings.EncodeInt(9), "i"); err != nil { + if err := u.Set(ctx, "i.2", v(settings.EncodeInt(9), "i")); err != nil { t.Fatal(err) } u.ResetRemaining(ctx) @@ -582,7 +582,7 @@ func TestCache(t *testing.T) { // value. { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "i.2", settings.EncodeBool(false), "b"); !testutils.IsError(err, + if err := u.Set(ctx, "i.2", v(settings.EncodeBool(false), "b")); !testutils.IsError(err, "setting 'i.2' defined as type i, not b", ) { t.Fatal(err) @@ -598,7 +598,7 @@ func TestCache(t *testing.T) { // current value. { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "i.2", settings.EncodeBool(false), "i"); !testutils.IsError(err, + if err := u.Set(ctx, "i.2", v(settings.EncodeBool(false), "i")); !testutils.IsError(err, "strconv.Atoi: parsing \"false\": invalid syntax", ) { t.Fatal(err) @@ -615,7 +615,7 @@ func TestCache(t *testing.T) { beforestrVal := strVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "str.val", "abc2def", "s"); !testutils.IsError(err, + if err := u.Set(ctx, "str.val", v("abc2def", "s")); !testutils.IsError(err, "not all runes of abc2def are letters: 2", ) { t.Fatal(err) @@ -629,7 +629,7 @@ func TestCache(t *testing.T) { beforeDVal := dVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "dVal", settings.EncodeDuration(-time.Hour), "d"); !testutils.IsError(err, + if err := u.Set(ctx, "dVal", v(settings.EncodeDuration(-time.Hour), "d")); !testutils.IsError(err, "cannot be set to a negative duration: -1h0m0s", ) { t.Fatal(err) @@ -643,7 +643,7 @@ func TestCache(t *testing.T) { beforeByteSizeVal := byteSizeVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "byteSize.Val", settings.EncodeInt(-mb), "z"); !testutils.IsError(err, + if err := u.Set(ctx, "byteSize.Val", v(settings.EncodeInt(-mb), "z")); !testutils.IsError(err, "bytesize cannot be negative", ) { t.Fatal(err) @@ -657,7 +657,7 @@ func TestCache(t *testing.T) { beforeFVal := fVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "fVal", settings.EncodeFloat(-1.1), "f"); !testutils.IsError(err, + if err := u.Set(ctx, "fVal", v(settings.EncodeFloat(-1.1), "f")); !testutils.IsError(err, "cannot set to a negative value: -1.1", ) { t.Fatal(err) @@ -671,7 +671,7 @@ func TestCache(t *testing.T) { beforeIVal := iVal.Get(sv) { u := settings.NewUpdater(sv) - if err := u.Set(ctx, "i.Val", settings.EncodeInt(-1), "i"); !testutils.IsError(err, + if err := u.Set(ctx, "i.Val", v(settings.EncodeInt(-1), "i")); !testutils.IsError(err, "int cannot be negative", ) { t.Fatal(err) @@ -721,7 +721,7 @@ func TestOnChangeWithMaxSettings(t *testing.T) { intSetting.SetOnChange(sv, func(ctx context.Context) { changes++ }) u := settings.NewUpdater(sv) - if err := u.Set(ctx, maxName, settings.EncodeInt(9), "i"); err != nil { + if err := u.Set(ctx, maxName, v(settings.EncodeInt(9), "i")); err != nil { t.Fatal(err) } @@ -837,3 +837,10 @@ func setDummyVersion(dv dummyVersion, vs *settings.VersionSetting, sv *settings. vs.SetInternal(context.Background(), sv, encoded) return nil } + +func v(val, typ string) settings.EncodedValue { + return settings.EncodedValue{ + Value: val, + Type: typ, + } +} diff --git a/pkg/settings/updater.go b/pkg/settings/updater.go index 9499c809ad09..e45270620593 100644 --- a/pkg/settings/updater.go +++ b/pkg/settings/updater.go @@ -18,22 +18,22 @@ import ( "github.com/cockroachdb/errors" ) -// EncodeDuration encodes a duration in the format parseRaw expects. +// EncodeDuration encodes a duration in the format of EncodedValue.Value. func EncodeDuration(d time.Duration) string { return d.String() } -// EncodeBool encodes a bool in the format parseRaw expects. +// EncodeBool encodes a bool in the format of EncodedValue.Value. func EncodeBool(b bool) string { return strconv.FormatBool(b) } -// EncodeInt encodes an int in the format parseRaw expects. +// EncodeInt encodes an int in the format of EncodedValue.Value. func EncodeInt(i int64) string { return strconv.FormatInt(i, 10) } -// EncodeFloat encodes a bool in the format parseRaw expects. +// EncodeFloat encodes a float in the format of EncodedValue.Value. func EncodeFloat(f float64) string { return strconv.FormatFloat(f, 'G', -1, 64) } @@ -50,7 +50,7 @@ type updater struct { // wrapped atomic settings values as we go and note which settings were updated, // then set the rest to default in ResetRemaining(). type Updater interface { - Set(ctx context.Context, key, rawValue, valType string) error + Set(ctx context.Context, key string, value EncodedValue) error ResetRemaining(ctx context.Context) } @@ -58,7 +58,7 @@ type Updater interface { type NoopUpdater struct{} // Set implements Updater. It is a no-op. -func (u NoopUpdater) Set(ctx context.Context, key, rawValue, valType string) error { return nil } +func (u NoopUpdater) Set(ctx context.Context, key string, value EncodedValue) error { return nil } // ResetRemaining implements Updater. It is a no-op. func (u NoopUpdater) ResetRemaining(context.Context) {} @@ -72,7 +72,7 @@ func NewUpdater(sv *Values) Updater { } // Set attempts to parse and update a setting and notes that it was updated. -func (u updater) Set(ctx context.Context, key, rawValue string, vt string) error { +func (u updater) Set(ctx context.Context, key string, value EncodedValue) error { d, ok := registry[key] if !ok { if _, ok := retiredSettings[key]; ok { @@ -84,40 +84,40 @@ func (u updater) Set(ctx context.Context, key, rawValue string, vt string) error u.m[key] = struct{}{} - if expected := d.Typ(); vt != expected { - return errors.Errorf("setting '%s' defined as type %s, not %s", key, expected, vt) + if expected := d.Typ(); value.Type != expected { + return errors.Errorf("setting '%s' defined as type %s, not %s", key, expected, value.Type) } switch setting := d.(type) { case *StringSetting: - return setting.set(ctx, u.sv, rawValue) + return setting.set(ctx, u.sv, value.Value) case *BoolSetting: - b, err := strconv.ParseBool(rawValue) + b, err := strconv.ParseBool(value.Value) if err != nil { return err } setting.set(ctx, u.sv, b) return nil case numericSetting: - i, err := strconv.Atoi(rawValue) + i, err := strconv.Atoi(value.Value) if err != nil { return err } return setting.set(ctx, u.sv, int64(i)) case *FloatSetting: - f, err := strconv.ParseFloat(rawValue, 64) + f, err := strconv.ParseFloat(value.Value, 64) if err != nil { return err } return setting.set(ctx, u.sv, f) case *DurationSetting: - d, err := time.ParseDuration(rawValue) + d, err := time.ParseDuration(value.Value) if err != nil { return err } return setting.set(ctx, u.sv, d) case *DurationSettingWithExplicitUnit: - d, err := time.ParseDuration(rawValue) + d, err := time.ParseDuration(value.Value) if err != nil { return err } diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index 0ac165cb9c2f..c2b25b5f62b9 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -299,7 +299,11 @@ func (n *setClusterSettingNode) startExec(params runParams) error { } if knobs := params.p.execCfg.TenantTestingKnobs; knobs != nil && knobs.ClusterSettingsUpdater != nil { - if err := params.p.execCfg.TenantTestingKnobs.ClusterSettingsUpdater.Set(ctx, n.name, encoded, n.setting.Typ()); err != nil { + encVal := settings.EncodedValue{ + Value: encoded, + Type: n.setting.Typ(), + } + if err := params.p.execCfg.TenantTestingKnobs.ClusterSettingsUpdater.Set(ctx, n.name, encVal); err != nil { return err } } From af5a690048066114bdc581dda818313046ba5158 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 8 Feb 2022 10:28:32 -0800 Subject: [PATCH 2/2] multitenant: listen for setting overrides This implements the tenant side code for setting overrides. Specifically, the tenant connector now implements the OverridesMonitor interface using the TenantSettings API. The server side of this API is not yet implemented, so this commit does not include end-to-end tests. Basic functionality is verified through a unit test that mocks the server-side API. Informs #73857. Release note: None --- pkg/ccl/kvccl/kvtenantccl/BUILD.bazel | 8 +- pkg/ccl/kvccl/kvtenantccl/connector.go | 73 +++++-- pkg/ccl/kvccl/kvtenantccl/connector_test.go | 24 ++- .../kvccl/kvtenantccl/setting_overrides.go | 151 ++++++++++++++ .../kvtenantccl/setting_overrides_test.go | 190 ++++++++++++++++++ pkg/kv/kvclient/kvtenant/BUILD.bazel | 1 + pkg/kv/kvclient/kvtenant/connector.go | 7 +- pkg/roachpb/api.proto | 2 +- pkg/rpc/auth_tenant.go | 17 ++ pkg/server/BUILD.bazel | 1 + pkg/server/node.go | 22 +- pkg/server/server_sql.go | 15 +- pkg/server/settingswatcher/BUILD.bazel | 3 +- pkg/server/settingswatcher/overrides.go | 7 +- .../settingswatcher/settings_watcher.go | 7 +- .../settings_watcher_external_test.go | 81 +++++++- .../settingswatcher/settings_watcher_test.go | 104 ---------- pkg/server/tenant.go | 1 + 18 files changed, 567 insertions(+), 147 deletions(-) create mode 100644 pkg/ccl/kvccl/kvtenantccl/setting_overrides.go create mode 100644 pkg/ccl/kvccl/kvtenantccl/setting_overrides_test.go delete mode 100644 pkg/server/settingswatcher/settings_watcher_test.go diff --git a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel index 117b10c42abb..16ec8993c2b6 100644 --- a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "kvtenantccl", - srcs = ["connector.go"], + srcs = [ + "connector.go", + "setting_overrides.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl", visibility = ["//visibility:public"], deps = [ @@ -16,6 +19,7 @@ go_library( "//pkg/roachpb", "//pkg/rpc", "//pkg/server/serverpb", + "//pkg/settings", "//pkg/spanconfig", "//pkg/util/contextutil", "//pkg/util/grpcutil", @@ -38,6 +42,7 @@ go_test( srcs = [ "connector_test.go", "main_test.go", + "setting_overrides_test.go", "tenant_kv_test.go", "tenant_trace_test.go", "tenant_upgrade_test.go", @@ -60,6 +65,7 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql", "//pkg/testutils", diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index bfaa9a225284..2cfe489e98b7 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" @@ -63,13 +64,13 @@ func init() { type Connector struct { log.AmbientContext + tenantID roachpb.TenantID rpcContext *rpc.Context rpcRetryOptions retry.Options rpcDialTimeout time.Duration // for testing rpcDial singleflight.Group defaultZoneCfg *zonepb.ZoneConfig addrs []string - startupC chan struct{} mu struct { syncutil.RWMutex @@ -78,6 +79,15 @@ type Connector struct { systemConfig *config.SystemConfig systemConfigChannels []chan<- struct{} } + + settingsMu struct { + syncutil.Mutex + + allTenantOverrides map[string]settings.EncodedValue + specificOverrides map[string]settings.EncodedValue + // notifyCh receives an event when there are changes to overrides. + notifyCh chan struct{} + } } // client represents an RPC client that proxies to a KV instance. @@ -117,14 +127,22 @@ var _ spanconfig.KVAccessor = (*Connector)(nil) // NOTE: Calling Start will set cfg.RPCContext.ClusterID. func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector { cfg.AmbientCtx.AddLogTag("tenant-connector", nil) - return &Connector{ + if cfg.TenantID.IsSystem() { + panic("TenantID not set") + } + c := &Connector{ + tenantID: cfg.TenantID, AmbientContext: cfg.AmbientCtx, rpcContext: cfg.RPCContext, rpcRetryOptions: cfg.RPCRetryOptions, defaultZoneCfg: cfg.DefaultZoneConfig, addrs: addrs, - startupC: make(chan struct{}), } + + c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor) + c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue) + c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue) + return c } // connectorFactory implements kvtenant.ConnectorFactory. @@ -140,26 +158,46 @@ func (connectorFactory) NewConnector( // connect to a KV node. Start returns once the connector has determined the // cluster's ID and set Connector.rpcContext.ClusterID. func (c *Connector) Start(ctx context.Context) error { - startupC := c.startupC + gossipStartupCh := make(chan struct{}) + settingsStartupCh := make(chan struct{}) bgCtx := c.AnnotateCtx(context.Background()) - if err := c.rpcContext.Stopper.RunAsyncTask(bgCtx, "connector", func(ctx context.Context) { + + if err := c.rpcContext.Stopper.RunAsyncTask(bgCtx, "connector-gossip", func(ctx context.Context) { ctx = c.AnnotateCtx(ctx) ctx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(ctx) defer cancel() - c.runGossipSubscription(ctx) + c.runGossipSubscription(ctx, gossipStartupCh) }); err != nil { return err } - // Synchronously block until the first GossipSubscription event. - select { - case <-startupC: - return nil - case <-ctx.Done(): - return ctx.Err() + if err := c.rpcContext.Stopper.RunAsyncTask(bgCtx, "connector-settings", func(ctx context.Context) { + ctx = c.AnnotateCtx(ctx) + ctx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(ctx) + defer cancel() + c.runTenantSettingsSubscription(ctx, settingsStartupCh) + }); err != nil { + return err } + + // Block until we receive the first GossipSubscription event and the initial + // setting overrides. + for gossipStartupCh != nil || settingsStartupCh != nil { + select { + case <-gossipStartupCh: + gossipStartupCh = nil + case <-settingsStartupCh: + settingsStartupCh = nil + case <-ctx.Done(): + return ctx.Err() + } + } + return nil } -func (c *Connector) runGossipSubscription(ctx context.Context) { +// runGossipSubscription listens for gossip subscription events. It closes the +// given channel once the ClusterID gossip key has been handled. +// Exits when the context is done. +func (c *Connector) runGossipSubscription(ctx context.Context, startupCh chan struct{}) { for ctx.Err() == nil { client, err := c.getClient(ctx) if err != nil { @@ -198,9 +236,9 @@ func (c *Connector) runGossipSubscription(ctx context.Context) { // Signal that startup is complete once the ClusterID gossip key has // been handled. - if c.startupC != nil && e.PatternMatched == gossip.KeyClusterID { - close(c.startupC) - c.startupC = nil + if startupCh != nil && e.PatternMatched == gossip.KeyClusterID { + close(startupCh) + startupCh = nil } } } @@ -257,9 +295,6 @@ func (c *Connector) updateNodeAddress(ctx context.Context, key string, content r // nothing ever removes them from Gossip.nodeDescs. Fix this. c.mu.Lock() defer c.mu.Unlock() - if c.mu.nodeDescs == nil { - c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor) - } c.mu.nodeDescs[desc.NodeID] = desc } diff --git a/pkg/ccl/kvccl/kvtenantccl/connector_test.go b/pkg/ccl/kvccl/kvtenantccl/connector_test.go index 2a2ebc00b7a2..cc664f5b7fd9 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector_test.go @@ -46,8 +46,9 @@ var rpcRetryOpts = retry.Options{ var _ roachpb.InternalServer = &mockServer{} type mockServer struct { - rangeLookupFn func(context.Context, *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error) - gossipSubFn func(*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer) error + rangeLookupFn func(context.Context, *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error) + gossipSubFn func(*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer) error + tenantSettingsFn func(request *roachpb.TenantSettingsRequest, server roachpb.Internal_TenantSettingsServer) error } func (m *mockServer) RangeLookup( @@ -62,6 +63,19 @@ func (m *mockServer) GossipSubscription( return m.gossipSubFn(req, stream) } +func (m *mockServer) TenantSettings( + req *roachpb.TenantSettingsRequest, stream roachpb.Internal_TenantSettingsServer, +) error { + if m.tenantSettingsFn == nil { + return stream.Send(&roachpb.TenantSettingsEvent{ + Precedence: roachpb.SpecificTenantOverrides, + Incremental: false, + Overrides: nil, + }) + } + return m.tenantSettingsFn(req, stream) +} + func (*mockServer) ResetQuorum( context.Context, *roachpb.ResetQuorumRequest, ) (*roachpb.ResetQuorumResponse, error) { @@ -112,12 +126,6 @@ func (m *mockServer) UpdateSystemSpanConfigs( panic("unimplemented") } -func (m *mockServer) TenantSettings( - *roachpb.TenantSettingsRequest, roachpb.Internal_TenantSettingsServer, -) error { - panic("unimplemented") -} - func gossipEventForClusterID(clusterID uuid.UUID) *roachpb.GossipSubscriptionEvent { return &roachpb.GossipSubscriptionEvent{ Key: gossip.KeyClusterID, diff --git a/pkg/ccl/kvccl/kvtenantccl/setting_overrides.go b/pkg/ccl/kvccl/kvtenantccl/setting_overrides.go new file mode 100644 index 000000000000..6cea5b27687a --- /dev/null +++ b/pkg/ccl/kvccl/kvtenantccl/setting_overrides.go @@ -0,0 +1,151 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package kvtenantccl + +import ( + "context" + "io" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/errors/errorspb" +) + +// runTenantSettingsSubscription listens for tenant setting override changes. +// It closes the given channel once the initial set of overrides were obtained. +// Exits when the context is done. +func (c *Connector) runTenantSettingsSubscription(ctx context.Context, startupCh chan struct{}) { + for ctx.Err() == nil { + client, err := c.getClient(ctx) + if err != nil { + continue + } + stream, err := client.TenantSettings(ctx, &roachpb.TenantSettingsRequest{ + TenantID: c.tenantID, + }) + if err != nil { + log.Warningf(ctx, "error issuing TenantSettings RPC: %v", err) + c.tryForgetClient(ctx, client) + continue + } + for firstEventInStream := true; ; firstEventInStream = false { + e, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + // Soft RPC error. Drop client and retry. + log.Warningf(ctx, "error consuming TenantSettings RPC: %v", err) + c.tryForgetClient(ctx, client) + break + } + if e.Error != (errorspb.EncodedError{}) { + // Hard logical error. We expect io.EOF next. + log.Errorf(ctx, "error consuming TenantSettings RPC: %v", e.Error) + continue + } + + if err := c.processSettingsEvent(e, firstEventInStream); err != nil { + log.Errorf(ctx, "error processing tenant settings event: %v", err) + _ = stream.CloseSend() + c.tryForgetClient(ctx, client) + break + } + + // Signal that startup is complete once we receive an event. + if startupCh != nil { + close(startupCh) + startupCh = nil + } + } + } +} + +// processSettingsEvent updates the setting overrides based on the event. +func (c *Connector) processSettingsEvent( + e *roachpb.TenantSettingsEvent, firstEventInStream bool, +) error { + if firstEventInStream && e.Incremental { + return errors.Newf("first event must not be Incremental") + } + c.settingsMu.Lock() + defer c.settingsMu.Unlock() + + var m map[string]settings.EncodedValue + switch e.Precedence { + case roachpb.AllTenantsOverrides: + m = c.settingsMu.allTenantOverrides + case roachpb.SpecificTenantOverrides: + m = c.settingsMu.specificOverrides + default: + return errors.Newf("unknown precedence value %d", e.Precedence) + } + + // If the event is not incremental, clear the map. + if !e.Incremental { + for k := range m { + delete(m, k) + } + } + // Merge in the override changes. + for _, o := range e.Overrides { + if o.Value == (settings.EncodedValue{}) { + // Empty value indicates that the override is removed. + delete(m, o.Name) + } else { + m[o.Name] = o.Value + } + } + + // Do a non-blocking send on the notification channel (if it is not nil). This + // is a buffered channel and if it already contains a message, there's no + // point in sending a duplicate notification. + select { + case c.settingsMu.notifyCh <- struct{}{}: + default: + } + + return nil +} + +// RegisterOverridesChannel is part of the settingswatcher.OverridesMonitor +// interface. +func (c *Connector) RegisterOverridesChannel() <-chan struct{} { + c.settingsMu.Lock() + defer c.settingsMu.Unlock() + if c.settingsMu.notifyCh != nil { + panic(errors.AssertionFailedf("multiple calls not supported")) + } + ch := make(chan struct{}, 1) + // Send an initial message on the channel. + ch <- struct{}{} + c.settingsMu.notifyCh = ch + return ch +} + +// Overrides is part of the settingswatcher.OverridesMonitor interface. +func (c *Connector) Overrides() map[string]settings.EncodedValue { + // We could be more efficient here, but we expect this function to be called + // only when there are changes (which should be rare). + res := make(map[string]settings.EncodedValue) + c.settingsMu.Lock() + defer c.settingsMu.Unlock() + // First copy the all-tenant overrides. + for name, val := range c.settingsMu.allTenantOverrides { + res[name] = val + } + // Then copy the specific overrides (which can overwrite some all-tenant + // overrides). + for name, val := range c.settingsMu.specificOverrides { + res[name] = val + } + return res +} diff --git a/pkg/ccl/kvccl/kvtenantccl/setting_overrides_test.go b/pkg/ccl/kvccl/kvtenantccl/setting_overrides_test.go new file mode 100644 index 000000000000..1bfb35e32ea5 --- /dev/null +++ b/pkg/ccl/kvccl/kvtenantccl/setting_overrides_test.go @@ -0,0 +1,190 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package kvtenantccl + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/netutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +// TestConnectorSettingOverrides tests Connector's role as a +// settingswatcher.OverridesMonitor. +func TestConnectorSettingOverrides(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + s := rpc.NewServer(rpcContext) + + tenantID := roachpb.MakeTenantID(5) + gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error { + return stream.Send(gossipEventForClusterID(rpcContext.ClusterID.Get())) + } + eventCh := make(chan *roachpb.TenantSettingsEvent) + defer close(eventCh) + settingsFn := func(req *roachpb.TenantSettingsRequest, stream roachpb.Internal_TenantSettingsServer) error { + if req.TenantID != tenantID { + t.Errorf("invalid tenantID %s (expected %s)", req.TenantID, tenantID) + } + for event := range eventCh { + if err := stream.Send(event); err != nil { + return err + } + } + return nil + } + roachpb.RegisterInternalServer(s, &mockServer{ + gossipSubFn: gossipSubFn, + tenantSettingsFn: settingsFn, + }) + ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr) + require.NoError(t, err) + + cfg := kvtenant.ConnectorConfig{ + TenantID: tenantID, + AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), + RPCContext: rpcContext, + RPCRetryOptions: rpcRetryOpts, + } + addrs := []string{ln.Addr().String()} + c := NewConnector(cfg, addrs) + + // Start should block until the first TenantSettings response. + startedC := make(chan error) + go func() { + startedC <- c.Start(ctx) + }() + select { + case err := <-startedC: + t.Fatalf("Start unexpectedly completed with err=%v", err) + case <-time.After(10 * time.Millisecond): + } + + ch := c.RegisterOverridesChannel() + // We should always get an initial notification. + waitForSettings(t, ch) + + ev := &roachpb.TenantSettingsEvent{ + Precedence: 1, + Incremental: false, + Overrides: nil, + } + eventCh <- ev + require.NoError(t, <-startedC) + + waitForSettings(t, ch) + expectSettings(t, c, "foo=default bar=default baz=default") + + st := func(name, val string) roachpb.TenantSetting { + return roachpb.TenantSetting{ + Name: name, + Value: settings.EncodedValue{Value: val}, + } + } + + // Set some all-tenant overrides. + ev = &roachpb.TenantSettingsEvent{ + Precedence: roachpb.AllTenantsOverrides, + Incremental: true, + Overrides: []roachpb.TenantSetting{st("foo", "all"), st("bar", "all")}, + } + eventCh <- ev + waitForSettings(t, ch) + expectSettings(t, c, "foo=all bar=all baz=default") + + // Set some tenant-specific overrides, with all-tenant overlap. + ev = &roachpb.TenantSettingsEvent{ + Precedence: roachpb.SpecificTenantOverrides, + Incremental: true, + Overrides: []roachpb.TenantSetting{st("foo", "specific"), st("baz", "specific")}, + } + eventCh <- ev + waitForSettings(t, ch) + expectSettings(t, c, "foo=specific bar=all baz=specific") + + // Remove an all-tenant override that has a specific override. + ev = &roachpb.TenantSettingsEvent{ + Precedence: roachpb.AllTenantsOverrides, + Incremental: true, + Overrides: []roachpb.TenantSetting{st("foo", "")}, + } + eventCh <- ev + waitForSettings(t, ch) + expectSettings(t, c, "foo=specific bar=all baz=specific") + + // Remove a specific override. + ev = &roachpb.TenantSettingsEvent{ + Precedence: roachpb.SpecificTenantOverrides, + Incremental: true, + Overrides: []roachpb.TenantSetting{st("foo", "")}, + } + eventCh <- ev + waitForSettings(t, ch) + expectSettings(t, c, "foo=default bar=all baz=specific") + + // Non-incremental change to all-tenants override. + ev = &roachpb.TenantSettingsEvent{ + Precedence: roachpb.AllTenantsOverrides, + Incremental: true, + Overrides: []roachpb.TenantSetting{st("bar", "all")}, + } + eventCh <- ev + waitForSettings(t, ch) + expectSettings(t, c, "foo=default bar=all baz=specific") +} + +func waitForSettings(t *testing.T, ch <-chan struct{}) { + t.Helper() + select { + case <-ch: + return + case <-time.After(10 * time.Second): + t.Fatalf("waitForSettings timed out") + } +} +func expectSettings(t *testing.T, c *Connector, exp string) { + t.Helper() + vars := []string{"foo", "bar", "baz"} + values := make(map[string]string) + for i := range vars { + values[vars[i]] = "default" + } + overrides := c.Overrides() + for _, v := range vars { + if val, ok := overrides[v]; ok { + values[v] = val.Value + } + } + var strs []string + for _, v := range vars { + strs = append(strs, fmt.Sprintf("%s=%s", v, values[v])) + } + str := strings.Join(strs, " ") + if str != exp { + t.Errorf("expected: %s got: %s", exp, str) + } +} diff --git a/pkg/kv/kvclient/kvtenant/BUILD.bazel b/pkg/kv/kvclient/kvtenant/BUILD.bazel index bf94b129ebea..21b7fa616306 100644 --- a/pkg/kv/kvclient/kvtenant/BUILD.bazel +++ b/pkg/kv/kvclient/kvtenant/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/rpc", "//pkg/rpc/nodedialer", "//pkg/server/serverpb", + "//pkg/server/settingswatcher", "//pkg/spanconfig", "//pkg/util/log", "//pkg/util/retry", diff --git a/pkg/kv/kvclient/kvtenant/connector.go b/pkg/kv/kvclient/kvtenant/connector.go index a3b0b41b8aac..6fc4796e8e44 100644 --- a/pkg/kv/kvclient/kvtenant/connector.go +++ b/pkg/kv/kvclient/kvtenant/connector.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -73,6 +74,9 @@ type Connector interface { // KVAccessor provides access to the subset of the cluster's span configs // applicable to secondary tenants. spanconfig.KVAccessor + + // OverridesMonitor provides access to tenant cluster setting overrides. + settingswatcher.OverridesMonitor } // TokenBucketProvider supplies an endpoint (to tenants) for the TokenBucket API @@ -86,13 +90,14 @@ type TokenBucketProvider interface { // ConnectorConfig encompasses the configuration required to create a Connector. type ConnectorConfig struct { + TenantID roachpb.TenantID AmbientCtx log.AmbientContext RPCContext *rpc.Context RPCRetryOptions retry.Options DefaultZoneConfig *zonepb.ZoneConfig } -// ConnectorFactory constructs a new tenant Connector from the provide network +// ConnectorFactory constructs a new tenant Connector from the provided network // addresses pointing to KV nodes. type ConnectorFactory interface { NewConnector(cfg ConnectorConfig, addrs []string) (Connector, error) diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index b19f1df76e20..953c9161e929 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2730,7 +2730,7 @@ message TenantSettingsEvent { // - the changed overrides since the last event for the precedence if // Incremental is true (removed overrides have empty RawValue and ValueType // fields). - repeated TenantSetting overrides = 3; + repeated TenantSetting overrides = 3 [(gogoproto.nullable) = false]; // If non-nil, the other fields will be empty and this will be the final event // sent on the stream before it is terminated. diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 56078743ebad..d592eef66f5d 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -61,6 +61,9 @@ func (a tenantAuthorizer) authorize( case "/cockroach.roachpb.Internal/TokenBucket": return a.authTokenBucket(tenID, req.(*roachpb.TokenBucketRequest)) + case "/cockroach.roachpb.Internal/TenantSettings": + return a.authTenantSettings(tenID, req.(*roachpb.TenantSettingsRequest)) + case "/cockroach.rpc.Heartbeat/Ping": return nil // no restriction to usage of this endpoint by tenants @@ -253,6 +256,20 @@ func (a tenantAuthorizer) authTokenBucket( return nil } +// authTenantSettings authorizes the provided tenant to invoke the +// TenantSettings RPC with the provided args. +func (a tenantAuthorizer) authTenantSettings( + tenID roachpb.TenantID, args *roachpb.TenantSettingsRequest, +) error { + if !args.TenantID.IsSet() { + return authErrorf("tenant settings request with unspecified tenant not permitted") + } + if args.TenantID != tenID { + return authErrorf("tenant settings request for tenant %s not permitted", args.TenantID) + } + return nil +} + // authGetSpanConfigs authorizes the provided tenant to invoke the // GetSpanConfigs RPC with the provided args. func (a tenantAuthorizer) authGetSpanConfigs( diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 177a0679720c..69980be305da 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -220,6 +220,7 @@ go_library( "@com_github_cockroachdb_circuitbreaker//:circuitbreaker", "@com_github_cockroachdb_cmux//:cmux", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_errors//errorspb", "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", diff --git a/pkg/server/node.go b/pkg/server/node.go index 88f519745729..dc67f92245f0 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -55,6 +55,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/errors/errorspb" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "google.golang.org/grpc/codes" @@ -1379,7 +1380,26 @@ func (n *Node) GossipSubscription( func (n *Node) TenantSettings( args *roachpb.TenantSettingsRequest, stream roachpb.Internal_TenantSettingsServer, ) error { - return errors.AssertionFailedf("not implemented") + ctx := n.storeCfg.AmbientCtx.AnnotateCtx(stream.Context()) + ctxDone := ctx.Done() + + // TODO(radu): implement this. For now we just respond with an initial event + // so the tenant can initialize. + e := &roachpb.TenantSettingsEvent{ + Precedence: roachpb.SpecificTenantOverrides, + Incremental: false, + Overrides: nil, + Error: errorspb.EncodedError{}, + } + if err := stream.Send(e); err != nil { + return err + } + select { + case <-ctxDone: + return ctx.Err() + case <-n.stopper.ShouldQuiesce(): + return stop.ErrUnavailable + } } // Join implements the roachpb.InternalServer service. This is the diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f235af0895f8..652adcb50ed0 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -949,9 +949,18 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { reporter.TestingKnobs = &cfg.TestingKnobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs } - settingsWatcher := settingswatcher.New( - cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, cfg.settingsStorage, - ) + var settingsWatcher *settingswatcher.SettingsWatcher + if codec.ForSystemTenant() { + settingsWatcher = settingswatcher.New( + cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, cfg.settingsStorage, + ) + } else { + // Create the tenant settings watcher, using the tenant connector as the + // overrides monitor. + settingsWatcher = settingswatcher.NewWithOverrides( + cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, cfg.tenantConnect, cfg.settingsStorage, + ) + } return &SQLServer{ ambientCtx: cfg.BaseConfig.AmbientCtx, diff --git a/pkg/server/settingswatcher/BUILD.bazel b/pkg/server/settingswatcher/BUILD.bazel index 909873ad9436..17d1f6a0fe99 100644 --- a/pkg/server/settingswatcher/BUILD.bazel +++ b/pkg/server/settingswatcher/BUILD.bazel @@ -39,10 +39,9 @@ go_test( "main_test.go", "row_decoder_external_test.go", "settings_watcher_external_test.go", - "settings_watcher_test.go", ], - embed = [":settingswatcher"], deps = [ + ":settingswatcher", "//pkg/base", "//pkg/keys", "//pkg/kv", diff --git a/pkg/server/settingswatcher/overrides.go b/pkg/server/settingswatcher/overrides.go index 88b22c24ea48..ff04e66d9858 100644 --- a/pkg/server/settingswatcher/overrides.go +++ b/pkg/server/settingswatcher/overrides.go @@ -19,9 +19,10 @@ import "github.com/cockroachdb/cockroach/pkg/settings" // Current() to retrieve the updated list of overrides when a message is // received. type OverridesMonitor interface { - // NotifyCh returns a channel that receives a message any time the current set - // of overrides changes. - NotifyCh() <-chan struct{} + // RegisterOverridesChannel returns a channel that receives a message + // any time the current set of overrides changes. + // The channel receives an initial event immediately. + RegisterOverridesChannel() <-chan struct{} // Overrides retrieves the current set of setting overrides, as a map from // setting key to EncodedValue. Any settings that are present must be set to diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 786937fb7a04..be8c4f9c586e 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -139,7 +139,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { // Set up a worker to watch the monitor. if err := s.stopper.RunAsyncTask(ctx, "setting-overrides", func(ctx context.Context) { - overridesCh := s.overridesMonitor.NotifyCh() + overridesCh := s.overridesMonitor.RegisterOverridesChannel() for { select { case <-overridesCh: @@ -355,3 +355,8 @@ func (s *SettingsWatcher) resetUpdater() { defer s.mu.Unlock() s.mu.updater = s.settings.MakeUpdater() } + +// SetTestingKnobs is used by tests to set testing knobs. +func (s *SettingsWatcher) SetTestingKnobs(knobs *rangefeedcache.TestingKnobs) { + s.testingWatcherKnobs = knobs +} diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index a19cfc3b953f..4f83a59d8e26 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -13,6 +13,7 @@ package settingswatcher_test import ( "bytes" "context" + "sync/atomic" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/settings" @@ -98,7 +100,7 @@ func TestSettingWatcherOnTenant(t *testing.T) { return len(rows) } checkSettingsValuesMatch := func(a, b *cluster.Settings) error { - return settingswatcher.CheckSettingsValuesMatch(t, a, b) + return CheckSettingsValuesMatch(t, a, b) } checkStoredValuesMatch := func(expected []roachpb.KeyValue) error { got := filterSystemOnly(getSourceClusterRows()) @@ -342,8 +344,8 @@ func (m *testingOverrideMonitor) unset(key string) { delete(m.mu.overrides, key) } -// NotifyCh is part of the settingswatcher.OverridesMonitor interface. -func (m *testingOverrideMonitor) NotifyCh() <-chan struct{} { +// RegisterOverridesChannel is part of the settingswatcher.OverridesMonitor interface. +func (m *testingOverrideMonitor) RegisterOverridesChannel() <-chan struct{} { return m.ch } @@ -357,3 +359,76 @@ func (m *testingOverrideMonitor) Overrides() map[string]settings.EncodedValue { } return res } + +// Test that an error occurring during processing of the +// rangefeedcache.Watcher can be recovered after a permanent +// rangefeed failure. +func TestOverflowRestart(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + sideSettings := cluster.MakeTestingClusterSettings() + w := settingswatcher.New( + s.Clock(), + s.ExecutorConfig().(sql.ExecutorConfig).Codec, + sideSettings, + s.RangeFeedFactory().(*rangefeed.Factory), + s.Stopper(), + nil, + ) + var exitCalled int64 // accessed with atomics + errCh := make(chan error) + w.SetTestingKnobs(&rangefeedcache.TestingKnobs{ + PreExit: func() { atomic.AddInt64(&exitCalled, 1) }, + ErrorInjectionCh: errCh, + }) + require.NoError(t, w.Start(ctx)) + tdb := sqlutils.MakeSQLRunner(sqlDB) + // Shorten the closed timestamp duration as a cheeky way to check the + // checkpointing code while also speeding up the test. + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10 ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") + + checkSettings := func() { + testutils.SucceedsSoon(t, func() error { + return CheckSettingsValuesMatch(t, s.ClusterSettings(), sideSettings) + }) + } + checkExits := func(exp int64) { + require.Equal(t, exp, atomic.LoadInt64(&exitCalled)) + } + waitForExits := func(exp int64) { + require.Eventually(t, func() bool { + return atomic.LoadInt64(&exitCalled) == exp + }, time.Minute, time.Millisecond) + } + + checkSettings() + tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '1m'") + checkSettings() + checkExits(0) + errCh <- errors.New("boom") + waitForExits(1) + tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '2s'") + checkSettings() + checkExits(1) +} + +// CheckSettingsValuesMatch is a test helper function to return an error when +// two settings do not match. It generally gets used with SucceeedsSoon. +func CheckSettingsValuesMatch(t *testing.T, a, b *cluster.Settings) error { + for _, k := range settings.Keys(false /* forSystemTenant */) { + s, ok := settings.Lookup(k, settings.LookupForLocalAccess, false /* forSystemTenant */) + require.True(t, ok) + if s.Class() == settings.SystemOnly { + continue + } + if av, bv := s.String(&a.SV), s.String(&b.SV); av != bv { + return errors.Errorf("values do not match for %s: %s != %s", k, av, bv) + } + } + return nil +} diff --git a/pkg/server/settingswatcher/settings_watcher_test.go b/pkg/server/settingswatcher/settings_watcher_test.go deleted file mode 100644 index f57f7145a70b..000000000000 --- a/pkg/server/settingswatcher/settings_watcher_test.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package settingswatcher - -import ( - "context" - "sync/atomic" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" - "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/require" -) - -// Test that an error occurring during processing of the -// rangefeedcache.Watcher can be recovered after a permanent -// rangefeed failure. -func TestOverflowRestart(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx := context.Background() - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(ctx) - - sideSettings := cluster.MakeTestingClusterSettings() - w := New( - s.Clock(), - s.ExecutorConfig().(sql.ExecutorConfig).Codec, - sideSettings, - s.RangeFeedFactory().(*rangefeed.Factory), - s.Stopper(), - nil, - ) - var exitCalled int64 // accessed with atomics - errCh := make(chan error) - w.testingWatcherKnobs = &rangefeedcache.TestingKnobs{ - PreExit: func() { atomic.AddInt64(&exitCalled, 1) }, - ErrorInjectionCh: errCh, - } - require.NoError(t, w.Start(ctx)) - tdb := sqlutils.MakeSQLRunner(sqlDB) - // Shorten the closed timestamp duration as a cheeky way to check the - // checkpointing code while also speeding up the test. - tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10 ms'") - tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") - - checkSettings := func() { - testutils.SucceedsSoon(t, func() error { - return CheckSettingsValuesMatch(t, s.ClusterSettings(), sideSettings) - }) - } - checkExits := func(exp int64) { - require.Equal(t, exp, atomic.LoadInt64(&exitCalled)) - } - waitForExits := func(exp int64) { - require.Eventually(t, func() bool { - return atomic.LoadInt64(&exitCalled) == exp - }, time.Minute, time.Millisecond) - } - - checkSettings() - tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '1m'") - checkSettings() - checkExits(0) - errCh <- errors.New("boom") - waitForExits(1) - tdb.Exec(t, "SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget = '2s'") - checkSettings() - checkExits(1) -} - -// CheckSettingsValuesMatch is a test helper function to return an error when -// two settings do not match. It generally gets used with SucceeedsSoon. -func CheckSettingsValuesMatch(t *testing.T, a, b *cluster.Settings) error { - for _, k := range settings.Keys(false /* forSystemTenant */) { - s, ok := settings.Lookup(k, settings.LookupForLocalAccess, false /* forSystemTenant */) - require.True(t, ok) - if s.Class() == settings.SystemOnly { - continue - } - if av, bv := s.String(&a.SV), s.String(&b.SV); av != bv { - return errors.Errorf("values do not match for %s: %s != %s", k, av, bv) - } - } - return nil -} diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 4341fa7d8be0..27a012e2b7db 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -383,6 +383,7 @@ func makeTenantSQLServerArgs( rpcRetryOptions := base.DefaultRetryOptions() tcCfg := kvtenant.ConnectorConfig{ + TenantID: sqlCfg.TenantID, AmbientCtx: baseCfg.AmbientCtx, RPCContext: rpcContext, RPCRetryOptions: rpcRetryOptions,