Skip to content

Commit

Permalink
settings,spanconfig: introduce a protobuf setting type
Browse files Browse the repository at this point in the history
For this setting type:
- the protoutil.Message is held in memory,
- the byte representation is stored in system.settings, and
- the json representation is used when accepting input and rendering
  state (through SHOW CLUSTER SETTING <setting-name>, the raw form is
  visible when looking directly at system.settings)

We also use this setting type to support power a
spanconfig.store.fallback_config_override, which overrides the fallback
config used for ranges with no explicit span configs set. Previously we
used a hardcoded value -- this makes it a bit more configurable. This is
a partial and backportable workaround (read: hack) for #91238 and
\#91239. In an internal escalation we observed "orphaned" ranges from
dropped tables that were not being being referenced by span configs (by
virtue of them originating from now-dropped tables/configs). Typically
ranges of this sort are short-lived, they're emptied out through GC and
merged into LHS neighbors. But if the neighboring ranges are large
enough, or load just high enough, the merge does not occur. For such
orphaned ranges we were using a hardcoded "fallback config", with a
replication factor of three. This made for confusing semantics where if
RANGE DEFAULT was configured to have a replication factor of five, our
replication reports indicated there were under-replicated ranges. This
is partly because replication reports today are powered by zone configs
(thus looking at RANGE DEFAULT) -- this will change shortly as part of
\#89987 where we'll instead consider span config data.
In any case, we were warning users of under-replicated ranges but within
KV we were not taking any action to upreplicate them -- KV was
respecting the hard-coded fallback config. The issues above describe
that we should apply each tenant's RANGE DEFAULT config to all such
orphaned ranges, which is probably the right fix. This was alluded to in
an earlier TODO but is still left for future work.

  // TODO(irfansharif): We're using a static[1] fallback span config
  // here, we could instead have this directly track the host tenant's
  // RANGE DEFAULT config, or go a step further and use the tenant's own
  // RANGE DEFAULT instead if the key is within the tenant's keyspace.
  // We'd have to thread that through the KVAccessor interface by
  // reserving special keys for these default configs.
  //
  // [1]: Modulo the private spanconfig.store.fallback_config_override, which
  //      applies globally.

So this PR instead takes a shortcut -- it makes the static config
configurable through a cluster setting. We can now do the following
which alters what fallback config is applied to orphaned ranges, and in
our example above, force such ranges to also have a replication factor
of five.

  SET CLUSTER SETTING spanconfig.store.fallback_config_override = '
    {
      "gcPolicy": {"ttlSeconds": 3600},
      "numReplicas": 5,
      "rangeMaxBytes": "536870912",
      "rangeMinBytes": "134217728"
    }';

Release note: None
  • Loading branch information
irfansharif committed Nov 25, 2022
1 parent 1a6e9f8 commit b0cfdc9
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 12 deletions.
52 changes: 52 additions & 0 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,58 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
})
}

// TestFallbackSpanConfigOverride ensures that
// spanconfig.store.fallback_config_override works as expected.
func TestFallbackSpanConfigOverride(t *testing.T) {
defer leaktest.AfterTest(t)()

st := cluster.MakeTestingClusterSettings()
spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig(), st, nil)
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)

ctx := context.Background()
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
DisableGCQueue: true,
},
SpanConfig: &spanconfig.TestingKnobs{
StoreKVSubscriberOverride: mockSubscriber,
},
},
}
s, _, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(context.Background())

_, err := s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", nil,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
`SET CLUSTER SETTING spanconfig.store.enabled = true`)
require.NoError(t, err)

key, err := s.ScratchRange()
require.NoError(t, err)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
repl := store.LookupReplica(keys.MustAddr(key))
span := repl.Desc().RSpan().AsRawSpanWithNoLocals()

conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3}
spanconfigstore.FallbackConfigOverride.Override(ctx, &st.SV, &conf)

require.NotNil(t, mockSubscriber.callback)
mockSubscriber.callback(ctx, span) // invoke the callback
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(keys.MustAddr(key))
gotConfig := repl.SpanConfig()
if !gotConfig.Equal(conf) {
return errors.Newf("expected config=%s, got config=%s", conf.String(), gotConfig.String())
}
return nil
})
}

type mockSpanConfigSubscriber struct {
callback func(ctx context.Context, config roachpb.Span)
spanconfig.Store
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,6 +2129,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
}
}
})

// We also want to do it when the fallback config setting is changed.
spanconfigstore.FallbackConfigOverride.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) {
s.applyAllFromSpanConfigStore(ctx)
})
}

if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal {
Expand Down
4 changes: 4 additions & 0 deletions pkg/settings/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"float.go",
"int.go",
"masked.go",
"protobuf.go",
"registry.go",
"setting.go",
"string.go",
Expand All @@ -29,10 +30,13 @@ go_library(
deps = [
"//pkg/util/buildutil",
"//pkg/util/humanizeutil",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//proto",
],
)

Expand Down
216 changes: 216 additions & 0 deletions pkg/settings/protobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package settings

import (
"context"
"reflect"
"strings"

"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
)

// ProtobufSetting is the interface of a setting variable that will be updated
// automatically when the corresponding cluster-wide setting of type "protobuf"
// is updated. The proto message is held in memory, the byte representation
// stored in system.settings, and JSON presentation used when accepting input
// and rendering (just through SHOW CLUSTER SETTING <setting-name>, the raw form
// is visible when looking directly at system.settings).
type ProtobufSetting struct {
defaultValue protoutil.Message
validateFn func(*Values, protoutil.Message) error
common
}

var _ internalSetting = &ProtobufSetting{}

// String returns the string representation of the setting's current value.
func (s *ProtobufSetting) String(sv *Values) string {
p := s.Get(sv)
json, err := s.MarshalToJSON(p)
if err != nil {
panic(errors.Wrapf(err, "marshaling %s: %+v", proto.MessageName(p), p))
}
return json
}

// Encoded returns the encoded value of the current value of the setting.
func (s *ProtobufSetting) Encoded(sv *Values) string {
p := s.Get(sv)
return EncodeProtobuf(p)
}

// EncodedDefault returns the encoded value of the default value of the setting.
func (s *ProtobufSetting) EncodedDefault() string {
return EncodeProtobuf(s.defaultValue)
}

// DecodeToString decodes and renders an encoded value.
func (s *ProtobufSetting) DecodeToString(encoded string) (string, error) {
message, err := s.DecodeValue(encoded)
if err != nil {
return "", err
}
return s.MarshalToJSON(message)
}

// Typ returns the short (1 char) string denoting the type of setting.
func (*ProtobufSetting) Typ() string {
return "p"
}

// DecodeValue decodes the value into a protobuf.
func (s *ProtobufSetting) DecodeValue(encoded string) (protoutil.Message, error) {
p, err := newProtoMessage(proto.MessageName(s.defaultValue))
if err != nil {
return nil, err
}

if err := protoutil.Unmarshal([]byte(encoded), p); err != nil {
return nil, err
}
return p, nil
}

// Default returns default value for setting.
func (s *ProtobufSetting) Default() protoutil.Message {
return s.defaultValue
}

// Get retrieves the protobuf value in the setting.
func (s *ProtobufSetting) Get(sv *Values) protoutil.Message {
loaded := sv.getGeneric(s.slot)
if loaded == nil {
return s.defaultValue
}
return loaded.(protoutil.Message)
}

// Validate that a value conforms with the validation function.
func (s *ProtobufSetting) Validate(sv *Values, p protoutil.Message) error {
if s.validateFn == nil {
return nil // nothing to do
}
return s.validateFn(sv, p)
}

// Override sets the setting to the given value, assuming it passes validation.
func (s *ProtobufSetting) Override(ctx context.Context, sv *Values, p protoutil.Message) {
_ = s.set(ctx, sv, p)
}

func (s *ProtobufSetting) set(ctx context.Context, sv *Values, p protoutil.Message) error {
if err := s.Validate(sv, p); err != nil {
return err
}
if s.Get(sv) != p {
sv.setGeneric(ctx, s.slot, p)
}
return nil
}

func (s *ProtobufSetting) setToDefault(ctx context.Context, sv *Values) {
if err := s.set(ctx, sv, s.defaultValue); err != nil {
panic(err)
}
}

// WithPublic sets public visibility and can be chained.
func (s *ProtobufSetting) WithPublic() *ProtobufSetting {
s.SetVisibility(Public)
return s
}

// MarshalToJSON returns a JSON representation of the protobuf.
func (s *ProtobufSetting) MarshalToJSON(p protoutil.Message) (string, error) {
jsonEncoder := jsonpb.Marshaler{EmitDefaults: false}
return jsonEncoder.MarshalToString(p)
}

// UnmarshalFromJSON unmarshals a protobuf from a json representation.
func (s *ProtobufSetting) UnmarshalFromJSON(jsonEncoded string) (protoutil.Message, error) {
p, err := newProtoMessage(proto.MessageName(s.defaultValue))
if err != nil {
return nil, err
}

json := &jsonpb.Unmarshaler{}
if err := json.Unmarshal(strings.NewReader(jsonEncoded), p); err != nil {
return nil, errors.Wrapf(err, "unmarshaling json to %s", proto.MessageName(p))
}
return p, nil
}

// RegisterProtobufSetting defines a new setting with type protobuf.
func RegisterProtobufSetting(
class Class, key, desc string, defaultValue protoutil.Message,
) *ProtobufSetting {
return RegisterValidatedProtobufSetting(class, key, desc, defaultValue, nil)
}

// RegisterValidatedProtobufSetting defines a new setting with type protobuf
// with a validation function.
func RegisterValidatedProtobufSetting(
class Class,
key, desc string,
defaultValue protoutil.Message,
validateFn func(*Values, protoutil.Message) error,
) *ProtobufSetting {
if validateFn != nil {
if err := validateFn(nil, defaultValue); err != nil {
panic(errors.Wrap(err, "invalid default"))
}
}
setting := &ProtobufSetting{
defaultValue: defaultValue,
validateFn: validateFn,
}

// By default, all protobuf settings are considered to contain PII and are
// thus non-reportable (to exclude them from telemetry reports).
setting.SetReportable(false)
register(class, key, desc, setting)
return setting
}

// Defeat the unused linter.
var _ = (*ProtobufSetting).Default
var _ = (*ProtobufSetting).WithPublic

// newProtoMessage creates a new protocol message object, given its fully
// qualified name.
func newProtoMessage(name string) (protoutil.Message, error) {
// Get the reflected type of the protocol message.
rt := proto.MessageType(name)
if rt == nil {
return nil, errors.Newf("unknown proto message type %s", name)
}

// If the message is known, we should get the pointer to our message.
if rt.Kind() != reflect.Ptr {
return nil, errors.AssertionFailedf(
"expected ptr to message, got %s instead", rt.Kind().String())
}

// Construct message of appropriate type, through reflection.
rv := reflect.New(rt.Elem())
msg, ok := rv.Interface().(protoutil.Message)
if !ok {
// Just to be safe.
return nil, errors.AssertionFailedf(
"unexpected proto type for %s; expected protoutil.Message, got %T",
name, rv.Interface())
}
return msg, nil
}
17 changes: 17 additions & 0 deletions pkg/settings/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"strconv"
"time"

"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
)

// EncodeDuration encodes a duration in the format of EncodedValue.Value.
Expand All @@ -38,6 +40,15 @@ func EncodeFloat(f float64) string {
return strconv.FormatFloat(f, 'G', -1, 64)
}

// EncodeProtobuf encodes a protobuf in the format of EncodedValue.Value.
func EncodeProtobuf(p protoutil.Message) string {
data := make([]byte, p.Size())
if _, err := p.MarshalTo(data); err != nil {
panic(errors.Wrapf(err, "encoding %s: %+v", proto.MessageName(p), p))
}
return string(data)
}

type updater struct {
sv *Values
m map[string]struct{}
Expand Down Expand Up @@ -91,6 +102,12 @@ func (u updater) Set(ctx context.Context, key string, value EncodedValue) error
switch setting := d.(type) {
case *StringSetting:
return setting.set(ctx, u.sv, value.Value)
case *ProtobufSetting:
p, err := setting.DecodeValue(value.Value)
if err != nil {
return err
}
return setting.set(ctx, u.sv, p)
case *BoolSetting:
b, err := setting.DecodeValue(value.Value)
if err != nil {
Expand Down
Loading

0 comments on commit b0cfdc9

Please sign in to comment.