Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

settings,spanconfig: introduce a protobuf setting type #92466

Merged
merged 1 commit into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,58 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
})
}

// TestFallbackSpanConfigOverride ensures that
// spanconfig.store.fallback_config_override works as expected.
func TestFallbackSpanConfigOverride(t *testing.T) {
defer leaktest.AfterTest(t)()

st := cluster.MakeTestingClusterSettings()
spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig(), st, nil)
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)

ctx := context.Background()
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
DisableGCQueue: true,
},
SpanConfig: &spanconfig.TestingKnobs{
StoreKVSubscriberOverride: mockSubscriber,
},
},
}
s, _, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(context.Background())

_, err := s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", nil,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
`SET CLUSTER SETTING spanconfig.store.enabled = true`)
require.NoError(t, err)

key, err := s.ScratchRange()
require.NoError(t, err)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
repl := store.LookupReplica(keys.MustAddr(key))
span := repl.Desc().RSpan().AsRawSpanWithNoLocals()

conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3}
spanconfigstore.FallbackConfigOverride.Override(ctx, &st.SV, &conf)

require.NotNil(t, mockSubscriber.callback)
mockSubscriber.callback(ctx, span) // invoke the callback
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(keys.MustAddr(key))
gotConfig := repl.SpanConfig()
if !gotConfig.Equal(conf) {
return errors.Newf("expected config=%s, got config=%s", conf.String(), gotConfig.String())
}
return nil
})
}

type mockSpanConfigSubscriber struct {
callback func(ctx context.Context, config roachpb.Span)
spanconfig.Store
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,6 +2129,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
}
}
})

// We also want to do it when the fallback config setting is changed.
spanconfigstore.FallbackConfigOverride.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) {
s.applyAllFromSpanConfigStore(ctx)
})
}

if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal {
Expand Down
4 changes: 4 additions & 0 deletions pkg/settings/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"float.go",
"int.go",
"masked.go",
"protobuf.go",
"registry.go",
"setting.go",
"string.go",
Expand All @@ -29,10 +30,13 @@ go_library(
deps = [
"//pkg/util/buildutil",
"//pkg/util/humanizeutil",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//proto",
],
)

Expand Down
216 changes: 216 additions & 0 deletions pkg/settings/protobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package settings

import (
"context"
"reflect"
"strings"

"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
)

// ProtobufSetting is the interface of a setting variable that will be updated
// automatically when the corresponding cluster-wide setting of type "protobuf"
// is updated. The proto message is held in memory, the byte representation
// stored in system.settings, and JSON presentation used when accepting input
// and rendering (just through SHOW CLUSTER SETTING <setting-name>, the raw form
// is visible when looking directly at system.settings).
type ProtobufSetting struct {
defaultValue protoutil.Message
validateFn func(*Values, protoutil.Message) error
common
}

var _ internalSetting = &ProtobufSetting{}

// String returns the string representation of the setting's current value.
func (s *ProtobufSetting) String(sv *Values) string {
p := s.Get(sv)
json, err := s.MarshalToJSON(p)
if err != nil {
panic(errors.Wrapf(err, "marshaling %s: %+v", proto.MessageName(p), p))
}
return json
}

// Encoded returns the encoded value of the current value of the setting.
func (s *ProtobufSetting) Encoded(sv *Values) string {
p := s.Get(sv)
return EncodeProtobuf(p)
}

// EncodedDefault returns the encoded value of the default value of the setting.
func (s *ProtobufSetting) EncodedDefault() string {
return EncodeProtobuf(s.defaultValue)
}

// DecodeToString decodes and renders an encoded value.
func (s *ProtobufSetting) DecodeToString(encoded string) (string, error) {
message, err := s.DecodeValue(encoded)
if err != nil {
return "", err
}
return s.MarshalToJSON(message)
}

// Typ returns the short (1 char) string denoting the type of setting.
func (*ProtobufSetting) Typ() string {
return "p"
}

// DecodeValue decodes the value into a protobuf.
func (s *ProtobufSetting) DecodeValue(encoded string) (protoutil.Message, error) {
p, err := newProtoMessage(proto.MessageName(s.defaultValue))
if err != nil {
return nil, err
}

if err := protoutil.Unmarshal([]byte(encoded), p); err != nil {
return nil, err
}
return p, nil
}

// Default returns default value for setting.
func (s *ProtobufSetting) Default() protoutil.Message {
return s.defaultValue
}

// Get retrieves the protobuf value in the setting.
func (s *ProtobufSetting) Get(sv *Values) protoutil.Message {
loaded := sv.getGeneric(s.slot)
if loaded == nil {
return s.defaultValue
}
return loaded.(protoutil.Message)
}

// Validate that a value conforms with the validation function.
func (s *ProtobufSetting) Validate(sv *Values, p protoutil.Message) error {
if s.validateFn == nil {
return nil // nothing to do
}
return s.validateFn(sv, p)
}

// Override sets the setting to the given value, assuming it passes validation.
func (s *ProtobufSetting) Override(ctx context.Context, sv *Values, p protoutil.Message) {
_ = s.set(ctx, sv, p)
}

func (s *ProtobufSetting) set(ctx context.Context, sv *Values, p protoutil.Message) error {
if err := s.Validate(sv, p); err != nil {
return err
}
if s.Get(sv) != p {
sv.setGeneric(ctx, s.slot, p)
}
return nil
}

func (s *ProtobufSetting) setToDefault(ctx context.Context, sv *Values) {
if err := s.set(ctx, sv, s.defaultValue); err != nil {
panic(err)
}
}

// WithPublic sets public visibility and can be chained.
func (s *ProtobufSetting) WithPublic() *ProtobufSetting {
s.SetVisibility(Public)
return s
}

// MarshalToJSON returns a JSON representation of the protobuf.
func (s *ProtobufSetting) MarshalToJSON(p protoutil.Message) (string, error) {
jsonEncoder := jsonpb.Marshaler{EmitDefaults: false}
return jsonEncoder.MarshalToString(p)
}

// UnmarshalFromJSON unmarshals a protobuf from a json representation.
func (s *ProtobufSetting) UnmarshalFromJSON(jsonEncoded string) (protoutil.Message, error) {
p, err := newProtoMessage(proto.MessageName(s.defaultValue))
if err != nil {
return nil, err
}

json := &jsonpb.Unmarshaler{}
if err := json.Unmarshal(strings.NewReader(jsonEncoded), p); err != nil {
return nil, errors.Wrapf(err, "unmarshaling json to %s", proto.MessageName(p))
}
return p, nil
}

// RegisterProtobufSetting defines a new setting with type protobuf.
func RegisterProtobufSetting(
class Class, key, desc string, defaultValue protoutil.Message,
) *ProtobufSetting {
return RegisterValidatedProtobufSetting(class, key, desc, defaultValue, nil)
}

// RegisterValidatedProtobufSetting defines a new setting with type protobuf
// with a validation function.
func RegisterValidatedProtobufSetting(
class Class,
key, desc string,
defaultValue protoutil.Message,
validateFn func(*Values, protoutil.Message) error,
) *ProtobufSetting {
if validateFn != nil {
if err := validateFn(nil, defaultValue); err != nil {
panic(errors.Wrap(err, "invalid default"))
}
}
setting := &ProtobufSetting{
defaultValue: defaultValue,
validateFn: validateFn,
}

// By default, all protobuf settings are considered to contain PII and are
// thus non-reportable (to exclude them from telemetry reports).
setting.SetReportable(false)
register(class, key, desc, setting)
return setting
}

// Defeat the unused linter.
var _ = (*ProtobufSetting).Default
var _ = (*ProtobufSetting).WithPublic

// newProtoMessage creates a new protocol message object, given its fully
// qualified name.
func newProtoMessage(name string) (protoutil.Message, error) {
// Get the reflected type of the protocol message.
rt := proto.MessageType(name)
if rt == nil {
return nil, errors.Newf("unknown proto message type %s", name)
}

// If the message is known, we should get the pointer to our message.
if rt.Kind() != reflect.Ptr {
return nil, errors.AssertionFailedf(
"expected ptr to message, got %s instead", rt.Kind().String())
}

// Construct message of appropriate type, through reflection.
rv := reflect.New(rt.Elem())
msg, ok := rv.Interface().(protoutil.Message)
if !ok {
// Just to be safe.
return nil, errors.AssertionFailedf(
"unexpected proto type for %s; expected protoutil.Message, got %T",
name, rv.Interface())
}
return msg, nil
}
17 changes: 17 additions & 0 deletions pkg/settings/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"strconv"
"time"

"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
)

// EncodeDuration encodes a duration in the format of EncodedValue.Value.
Expand All @@ -38,6 +40,15 @@ func EncodeFloat(f float64) string {
return strconv.FormatFloat(f, 'G', -1, 64)
}

// EncodeProtobuf encodes a protobuf in the format of EncodedValue.Value.
func EncodeProtobuf(p protoutil.Message) string {
data := make([]byte, p.Size())
if _, err := p.MarshalTo(data); err != nil {
panic(errors.Wrapf(err, "encoding %s: %+v", proto.MessageName(p), p))
}
return string(data)
}

type updater struct {
sv *Values
m map[string]struct{}
Expand Down Expand Up @@ -91,6 +102,12 @@ func (u updater) Set(ctx context.Context, key string, value EncodedValue) error
switch setting := d.(type) {
case *StringSetting:
return setting.set(ctx, u.sv, value.Value)
case *ProtobufSetting:
p, err := setting.DecodeValue(value.Value)
if err != nil {
return err
}
return setting.set(ctx, u.sv, p)
case *BoolSetting:
b, err := setting.DecodeValue(value.Value)
if err != nil {
Expand Down
Loading