Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: spanconfig: integrate SpanConfigBounds with the Store and KVSubscriber #100013

Merged
merged 4 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ go_test(
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigbounds",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql",
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigbounds"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand All @@ -41,6 +42,7 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
spanConfigStore := spanconfigstore.New(
roachpb.TestingDefaultSpanConfig(),
cluster.MakeTestingClusterSettings(),
spanconfigbounds.NewEmptyReader(),
nil,
)
var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -107,7 +109,9 @@ func TestFallbackSpanConfigOverride(t *testing.T) {
defer leaktest.AfterTest(t)()

st := cluster.MakeTestingClusterSettings()
spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig(), st, nil)
spanConfigStore := spanconfigstore.New(
roachpb.TestingDefaultSpanConfig(), st, spanconfigbounds.NewEmptyReader(), nil,
)
var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ func (b boolCapValue) SafeFormat(p redact.SafePrinter, verb rune) {
p.Print(redact.Safe(bool(b)))
}

// Unwrap implements the tenantcapabilities.Value interface.
func (b boolCapValue) Unwrap() interface{} { return bool(b) }

// Unwrap implements the tenantcapabilities.Value interface.
func (m *SpanConfigBounds) Unwrap() interface{} { return m }

func (m *SpanConfigBounds) SafeFormat(p redact.SafePrinter, verb rune) {
p.Print(redact.SafeString(m.String()))
}

// boolCap is an accessor struct for boolean capabilities.
type boolCap struct {
cap *bool
Expand Down Expand Up @@ -73,6 +79,26 @@ func (i invertedBoolCap) Set(val interface{}) {
*i.cap = !bval
}

// spanConfigBoundsCap is an accessor struct for SpanConfigBounds that are
// stored on the underlying TenantCapabilities proto.
type spanConfigBoundsCap struct {
cap *SpanConfigBounds
}

// Get implements the tenantcapabilities.Capability interface.
func (s spanConfigBoundsCap) Get() tenantcapabilities.Value {
return s.cap
}

// Set implements the tenantcapabilities.Capability interface.
func (s spanConfigBoundsCap) Set(val interface{}) {
scfgBoundsVal, ok := val.(SpanConfigBounds)
if !ok {
panic(errors.AssertionFailedf("invalid value type: %T", val))
}
*s.cap = scfgBoundsVal
}

// Cap implements the tenantcapabilities.TenantCapabilities interface.
func (t *TenantCapabilities) Cap(
capabilityID tenantcapabilities.CapabilityID,
Expand All @@ -92,6 +118,8 @@ func (t *TenantCapabilities) Cap(
return boolCap{&t.CanViewTSDBMetrics}
case tenantcapabilities.ExemptFromRateLimiting:
return boolCap{&t.ExemptFromRateLimiting}
case tenantcapabilities.TenantSpanConfigBounds:
return spanConfigBoundsCap{t.SpanConfigBounds}

default:
panic(errors.AssertionFailedf("unknown capability: %q", capabilityID.String()))
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigbounds",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/spanconfig/spanconfigkvsubscriber",
Expand Down
20 changes: 11 additions & 9 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/tenantsettingswatcher"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigbounds"
_ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber"
Expand Down Expand Up @@ -623,6 +624,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
keys.SystemSQLCodec, clock, rangeFeedFactory, &cfg.DefaultZoneConfig,
)

tenantCapabilitiesWatcher := tenantcapabilitieswatcher.New(
clock,
rangeFeedFactory,
keys.TenantsTableID,
stopper,
1<<20, /* 1 MB */
tenantCapabilitiesTestingKnobs,
)

var spanConfig struct {
// kvAccessor powers the span configuration RPCs and the host tenant's
// reconciliation job.
Expand Down Expand Up @@ -668,6 +678,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
1<<20, /* 1 MB */
fallbackConf,
cfg.Settings,
spanconfigbounds.NewReader(tenantCapabilitiesWatcher),
spanConfigKnobs,
registry,
)
Expand Down Expand Up @@ -791,15 +802,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
clock, rangeFeedFactory, stopper, st,
)

tenantCapabilitiesWatcher := tenantcapabilitieswatcher.New(
clock,
rangeFeedFactory,
keys.TenantsTableID,
stopper,
1<<20, /* 1 MB */
tenantCapabilitiesTestingKnobs,
)

node := NewNode(
storeCfg,
recorder,
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigbounds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"bool_field.go",
"bounds.go",
"bounds_reader.go",
"constraints_field.go",
"doc.go",
"fields.go",
Expand All @@ -21,6 +22,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config",
"//pkg/multitenant/tenantcapabilities",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb",
"//pkg/roachpb",
"//pkg/util/protoutil",
Expand Down
66 changes: 66 additions & 0 deletions pkg/spanconfig/spanconfigbounds/bounds_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigbounds

import (
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// Reader maintains an in-memory view of the global SpanConfigBounds state.
//
// SpanConfigBounds are stored as tenant capabilities, the state of which is
// surfaced by the tenantcapabilities.Reader. BoundsReader serves as a narrow,
// adapter interface for the same.
type Reader interface {
// Bounds returns span config bounds set for a given tenant. If no bounds have
// been configured for the given tenant, found returns false.
Bounds(tenID roachpb.TenantID) (_ Bounds, found bool)
}

type boundsReader struct {
capabilitiesReader tenantcapabilities.Reader
}

// NewReader constructs and returns a new Reader.
func NewReader(reader tenantcapabilities.Reader) Reader {
return &boundsReader{
capabilitiesReader: reader,
}
}

// Bounds implements the BoundsReader interface.
func (r *boundsReader) Bounds(tenID roachpb.TenantID) (_ Bounds, found bool) {
capabilities, found := r.capabilitiesReader.GetCapabilities(tenID)
if !found {
return Bounds{}, false
}

boundspb := capabilities.Cap(tenantcapabilities.TenantSpanConfigBounds).Get().Unwrap().(*tenantcapabilitiespb.SpanConfigBounds)
if boundspb == nil {
return Bounds{}, false
}
return MakeBounds(boundspb), true
}

// NewEmptyReader returns a new Reader which corresponds to an empty span config
// bounds state. It's only intended for testing.
func NewEmptyReader() Reader {
return emptyReader(true)
}

type emptyReader bool

// Bounds implements the Reader interface.
func (emptyReader) Bounds(roachpb.TenantID) (Bounds, bool) {
return Bounds{}, false
}
3 changes: 0 additions & 3 deletions pkg/spanconfig/spanconfigbounds/int32field.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func (f int32Field) SafeFormat(s redact.SafePrinter, verb rune) {

func (f int32Field) FieldBound(b Bounds) ValueBounds {
getBound := func() *tenantcapabilitiespb.SpanConfigBounds_Int32Range {
if b.b.ConstraintBounds == nil {
return nil
}
switch f {
case numReplicas:
return b.b.NumReplicas
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigbounds/span_config_bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (b Bounds) Check(c *roachpb.SpanConfig) Violations {

func (b Bounds) clamp(c *roachpb.SpanConfig, reporter func(Field)) (changed bool) {
for _, f := range fields {
if b := f.FieldBound(b); !b.clamp(c, f) {
if bb := f.FieldBound(b); !bb.clamp(c, f) {
continue
}
changed = true
Expand Down
20 changes: 15 additions & 5 deletions pkg/spanconfig/spanconfigbounds/testdata/basic
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
bounds name=foo
gc_ttl_seconds: <start: 123, end: 7000>
range_max_bytes: <start: 10, end: 20>
num_voters: <start: 3, end: 5>
num_replicas: <start: 3, end: 11>
----

config name=bar
gc_policy: <ttl_seconds: 122>
range_min_bytes: 5
range_max_bytes: 5
num_voters: 7
num_replicas: 7
----

conforms bounds=foo config=bar
Expand All @@ -15,23 +19,29 @@ false

check bounds=foo config=bar
----
span config bounds violated for fields: range_max_bytes
span config bounds violated for fields: range_max_bytes
(1) span config bounds violated for fields: range_max_bytes
span config bounds violated for fields: range_max_bytes, num_voters, gc.ttlseconds
span config bounds violated for fields: range_max_bytes, num_voters, gc.ttlseconds
(1) span config bounds violated for fields: range_max_bytes, num_voters, gc.ttlseconds
| range_max_bytes: 5 does not conform to [10, 20], will be clamped to 10
| num_voters: 7 does not conform to [3, 5], will be clamped to 5
| gc.ttlseconds: 122 does not conform to [123, 7000], will be clamped to 123
Error types: (1) *spanconfigbounds.ViolationError


clamp bounds=foo config=bar
----
----
@@ -1,6 +1,6 @@
@@ -1,8 +1,8 @@
range_min_bytes: 5
-range_max_bytes: 5
+range_max_bytes: 10
gc_policy: <
ttl_seconds: 122
- ttl_seconds: 122
+ ttl_seconds: 123
>
num_replicas: 7
-num_voters: 7
+num_voters: 5

----
----
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigbounds",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql/catalog",
"//pkg/sql/catalog/systemschema",
Expand Down Expand Up @@ -63,6 +64,7 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigbounds",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/spanconfig/spanconfigtestutils",
"//pkg/sql/isql",
Expand Down
3 changes: 2 additions & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigbounds"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
Expand Down Expand Up @@ -100,7 +101,6 @@ import (
// Text of the form [a,b) and [a,b):C correspond to spans and span config
// records; see spanconfigtestutils.Parse{Span,Config,SpanConfigRecord} for more
// details.
// TODO(arul): Add ability to express tenant spans to this datadriven test.
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -148,6 +148,7 @@ func TestDataDriven(t *testing.T) {
10<<20, /* 10 MB */
spanconfigtestutils.ParseConfig(t, "FALLBACK"),
tc.Server(0).ClusterSettings(),
spanconfigbounds.NewEmptyReader(),
&spanconfig.TestingKnobs{
KVSubscriberRangeFeedKnobs: &rangefeedcache.TestingKnobs{
OnTimestampAdvance: func(ts hlc.Timestamp) {
Expand Down
18 changes: 12 additions & 6 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigbounds"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -139,6 +140,9 @@ type KVSubscriber struct {

clock *hlc.Clock
metrics *Metrics

// boundsReader provides a handle to the global SpanConfigBounds state.
boundsReader spanconfigbounds.Reader
}

var _ spanconfig.KVSubscriber = &KVSubscriber{}
Expand Down Expand Up @@ -186,6 +190,7 @@ func New(
bufferMemLimit int64,
fallback roachpb.SpanConfig,
settings *cluster.Settings,
boundsReader spanconfigbounds.Reader,
knobs *spanconfig.TestingKnobs,
registry *metric.Registry,
) *KVSubscriber {
Expand All @@ -200,12 +205,13 @@ func New(
Key: spanConfigTableStart,
EndKey: spanConfigTableStart.PrefixEnd(),
}
spanConfigStore := spanconfigstore.New(fallback, settings, knobs)
spanConfigStore := spanconfigstore.New(fallback, settings, boundsReader, knobs)
s := &KVSubscriber{
fallback: fallback,
knobs: knobs,
settings: settings,
clock: clock,
fallback: fallback,
knobs: knobs,
settings: settings,
clock: clock,
boundsReader: boundsReader,
}
var rfCacheKnobs *rangefeedcache.TestingKnobs
if knobs != nil {
Expand Down Expand Up @@ -395,7 +401,7 @@ func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update
func (s *KVSubscriber) handleCompleteUpdate(
ctx context.Context, ts hlc.Timestamp, events []rangefeedbuffer.Event,
) {
freshStore := spanconfigstore.New(s.fallback, s.settings, s.knobs)
freshStore := spanconfigstore.New(s.fallback, s.settings, s.boundsReader, s.knobs)
for _, ev := range events {
freshStore.Apply(ctx, false /* dryrun */, ev.(*bufferEvent).Update)
}
Expand Down
Loading