Skip to content

Commit

Permalink
spanconfig: introduce spanconfig.{KVWatcher,StoreWriter}
Browse files Browse the repository at this point in the history
The KVWatcher is a per-store listener over `system.span_configurations`.
It listens in on updates to the global span configurations state, and
uses these updates to maintain a (new) in-memory spanconfig.StoreWriter
datastructure.

This datastructure satisfies the spanconfig.Store interface, and as such
can be used to replace the gossiped config.SystemConfig.

Release note: None
Release justification: non-production code changes
  • Loading branch information
irfansharif committed Sep 14, 2021
1 parent ce9e3b7 commit 9372967
Show file tree
Hide file tree
Showing 38 changed files with 1,879 additions and 81 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ ALL_TESTS = [
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test",
"//pkg/spanconfig/spanconfigkvwatcher:spanconfigkvwatcher_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catalogkv:catalogkv_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
3 changes: 3 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type TestServerArgs struct {
// IF set, the demo login endpoint will be enabled.
EnableDemoLoginEndpoint bool

// If set, the experimental span configs infrastructure will be enabled.
EnableExperimentalSpanConfigs bool

// If set, a TraceDir is initialized at the provided path.
TraceDir string

Expand Down
5 changes: 5 additions & 0 deletions pkg/config/zonepb/zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,3 +1429,8 @@ func TestZoneConfigToSpanConfigConversion(t *testing.T) {
require.Equal(t, tc.expectSpanConfig, spanConfig)
}
}

func TestDefaultZoneAndSpanConfigs(t *testing.T) {
converted := DefaultZoneConfigRef().AsSpanConfig()
require.True(t, converted.Equal(roachpb.TestingDefaultSpanConfig()))
}
9 changes: 5 additions & 4 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,11 @@ const (
ZonesTableConfigColumnID = 2
ZonesTableConfigColFamID = 2

DescriptorTablePrimaryKeyIndexID = 1
DescriptorTableDescriptorColID = 2
DescriptorTableDescriptorColFamID = 2
TenantsTablePrimaryKeyIndexID = 1
DescriptorTablePrimaryKeyIndexID = 1
DescriptorTableDescriptorColID = 2
DescriptorTableDescriptorColFamID = 2
TenantsTablePrimaryKeyIndexID = 1
SpanConfigurationsTablePrimaryKeyIndexID = 1

// Reserved IDs for other system tables. Note that some of these IDs refer
// to "Ranges" instead of a Table - these IDs are needed to store custom
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Override default span config.
cfg := kvserver.TestingDefaultSpanConfig()
cfg := roachpb.TestingDefaultSpanConfig()
cfg.RangeMaxBytes = 1 << 18

// Manually create the local test cluster so that the split queue
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
cfg: StoreConfig{
Clock: hlc.NewClock(hlc.UnixNano, time.Second),
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
DefaultSpanConfig: TestingDefaultSpanConfig(),
DefaultSpanConfig: roachpb.TestingDefaultSpanConfig(),
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/split_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSplitQueueShouldQueue(t *testing.T) {
repl.mu.Lock()
repl.mu.state.Stats = &enginepb.MVCCStats{KeyBytes: test.bytes}
repl.mu.Unlock()
conf := TestingDefaultSpanConfig()
conf := roachpb.TestingDefaultSpanConfig()
conf.RangeMaxBytes = test.maxBytes
repl.SetSpanConfig(conf)

Expand Down
164 changes: 146 additions & 18 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -202,7 +203,6 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig {
st := cluster.MakeTestingClusterSettings()
sc := StoreConfig{
DefaultSpanConfig: zonepb.DefaultZoneConfigRef().AsSpanConfig(),
DefaultSystemSpanConfig: zonepb.DefaultSystemZoneConfigRef().AsSpanConfig(),
Settings: st,
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Clock: clock,
Expand Down Expand Up @@ -624,6 +624,7 @@ type Store struct {

computeInitialMetrics sync.Once
systemConfigUpdateQueueRateLimiter *quotapool.RateLimiter
spanConfigUpdateQueueRateLimiter *quotapool.RateLimiter
}

var _ kv.Sender = &Store{}
Expand All @@ -636,18 +637,17 @@ type StoreConfig struct {
AmbientCtx log.AmbientContext
base.RaftConfig

DefaultSpanConfig roachpb.SpanConfig
DefaultSystemSpanConfig roachpb.SpanConfig
Settings *cluster.Settings
Clock *hlc.Clock
DB *kv.DB
Gossip *gossip.Gossip
NodeLiveness *liveness.NodeLiveness
StorePool *StorePool
Transport *RaftTransport
NodeDialer *nodedialer.Dialer
RPCContext *rpc.Context
RangeDescriptorCache *rangecache.RangeCache
DefaultSpanConfig roachpb.SpanConfig
Settings *cluster.Settings
Clock *hlc.Clock
DB *kv.DB
Gossip *gossip.Gossip
NodeLiveness *liveness.NodeLiveness
StorePool *StorePool
Transport *RaftTransport
NodeDialer *nodedialer.Dialer
RPCContext *rpc.Context
RangeDescriptorCache *rangecache.RangeCache

ClosedTimestamp *container.Container
ClosedTimestampSender *sidetransport.Sender
Expand Down Expand Up @@ -733,6 +733,10 @@ type StoreConfig struct {
// SpanConfigsEnabled determines whether we're able to use the span configs
// infrastructure.
SpanConfigsEnabled bool
// Used to watch for span configuration changes.
SpanConfigWatcher spanconfig.KVWatcher
// SpanConfigStore is used to store and retrieve span configs.
SpanConfigStore spanconfig.Store
}

// ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the
Expand Down Expand Up @@ -1556,6 +1560,44 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback)
}

if s.cfg.SpanConfigsEnabled {
// When toggling between the system config span and the span configs
// infrastructure, we want to re-apply all span configs on all
// replicas from whatever the new source is.
spanconfigstore.EnabledSetting.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) {
enabled := spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV)
if !enabled {
if s.cfg.Gossip != nil && s.cfg.Gossip.GetSystemConfig() != nil {
s.systemGossipUpdate(s.cfg.Gossip.GetSystemConfig())
}

return
}

now := s.cfg.Clock.NowAsClockTimestamp()
shouldQueue := s.spanConfigUpdateQueueRateLimiter.AdmitN(1)
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
replCtx := repl.AnnotateCtx(ctx)
key := repl.Desc().StartKey
conf, err := s.cfg.SpanConfigStore.GetSpanConfigForKey(replCtx, key)
if err != nil {
log.Fatalf(ctx, "%v", err)
}

repl.SetSpanConfig(conf)
if shouldQueue {
s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
return true // more
})
})
}

// Gossip is only ever nil while bootstrapping a cluster and
// in unittests.
if s.cfg.Gossip != nil {
Expand Down Expand Up @@ -1594,6 +1636,32 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
})
}

// SpanConfigWatcher is nil unless COCKROACH_EXPERIMENTAL_SPAN_CONFIGS
// is set, and in unit tests.
if s.cfg.SpanConfigsEnabled && s.cfg.SpanConfigWatcher != nil && !s.cfg.TestingKnobs.DisableSpanConfigWatcher {
// Start the watcher process to listen in on span config updates and
// propagate said updates to the underlying store.
spanConfigUpdateC, err := s.cfg.SpanConfigWatcher.WatchForKVUpdates(ctx)
if err != nil {
return err
}
if err := s.stopper.RunAsyncTask(ctx, "spanconfig-watcher", func(context.Context) {
for {
select {
case update := <-spanConfigUpdateC:
if interceptor := s.TestingKnobs().SpanConfigUpdateInterceptor; interceptor != nil {
interceptor(update)
}
s.onSpanConfigUpdate(ctx, update)
case <-s.stopper.ShouldQuiesce():
return
}
}
}); err != nil {
return err
}
}

if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal {
s.startLeaseRenewer(ctx)
}
Expand Down Expand Up @@ -1752,6 +1820,10 @@ func (s *Store) GetConfReader() (spanconfig.StoreReader, error) {
return nil, errSysCfgUnavailable
}

if s.cfg.SpanConfigsEnabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) {
return spanconfigstore.NewShadowReader(s.cfg.SpanConfigStore, sysCfg), nil
}

return sysCfg, nil
}

Expand Down Expand Up @@ -1997,6 +2069,67 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
})
}

// onSpanConfigUpdate is the callback invoked whenever this store learns of a
// span config update.
func (s *Store) onSpanConfigUpdate(ctx context.Context, update spanconfig.Update) {
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(ctx, "received update span=%s conf=%s deleted=%t", update.Span, update.Config.String(), update.Deletion())
}

s.cfg.SpanConfigStore.Apply(ctx, update, false)

if !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) {
return
}

// We'll want to offer all replicas to the split and merge queues. Be a
// little careful about not spawning too many individual goroutines.

now := s.cfg.Clock.NowAsClockTimestamp()
shouldQueue := s.spanConfigUpdateQueueRateLimiter.AdmitN(1)
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
replCtx := repl.AnnotateCtx(ctx)
replicaSpan := repl.Desc().RSpan().AsRawSpanWithNoLocals()

if !update.Span.Overlaps(replicaSpan) {
return true // more
}

// TODO(irfansharif): It's possible for a config to be applied over an
// entire range when it only pertains to the first half of the range.
// This will be corrected shortly -- we enqueue the range for a split
// below where we then apply the right config on each half. But still,
// it's surprising behavior and gets in the way of a desirable
// consistency guarantee: a key's config at any point in time is one
// that was explicitly declared over it, or the default config. We can
// do better, we can skip applying the config entirely and enqueue the
// split, then relying on the split to install the right configs on each
// half. The current structure is as it is to maintain parity with the
// system config span variant.
key := repl.Desc().StartKey
conf, err := s.cfg.SpanConfigStore.GetSpanConfigForKey(replCtx, key)
if err != nil {
log.Fatalf(replCtx, "%v", err)
}

repl.SetSpanConfig(conf)

// TODO(irfansharif): For symmetry with the system config span variant,
// we queue blindly; we could instead only queue it if we knew the
// range's keyspans has a split in there somewhere, or was now part of a
// larger range and eligible for a merge.
if shouldQueue {
s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
return true // more
})
}

func (s *Store) asyncGossipStore(ctx context.Context, reason string, useCached bool) {
if err := s.stopper.RunAsyncTask(
ctx, fmt.Sprintf("storage.Store: gossip on %s", reason),
Expand Down Expand Up @@ -2982,8 +3115,3 @@ func min(a, b int) int {
}
return b
}

// TestingDefaultSpanConfig exposes the default span config for testing purposes.
func TestingDefaultSpanConfig() roachpb.SpanConfig {
return zonepb.DefaultZoneConfigRef().AsSpanConfig()
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -91,6 +92,8 @@ type StoreTestingKnobs struct {
// DisableAutomaticLeaseRenewal enables turning off the background worker
// that attempts to automatically renew expiration-based leases.
DisableAutomaticLeaseRenewal bool
// DisableSpanConfigWatcher disables the span config watcher process.
DisableSpanConfigWatcher bool
// LeaseRequestEvent, if set, is called when replica.requestLeaseLocked() is
// called to acquire a new lease. This can be used to assert that a request
// triggers a lease acquisition.
Expand Down Expand Up @@ -327,6 +330,9 @@ type StoreTestingKnobs struct {
// PurgeOutdatedReplicasInterceptor intercepts attempts to purge outdated
// replicas in the store.
PurgeOutdatedReplicasInterceptor func()
// SpanConfigUpdateInterceptor is called after the store hears about a span
// config update.
SpanConfigUpdateInterceptor func(spanconfig.Update)
// If set, use the given truncated state type when bootstrapping ranges.
// This is used for testing the truncated state migration.
TruncatedStateTypeOverride *stateloader.TruncatedStateType
Expand Down
20 changes: 20 additions & 0 deletions pkg/roachpb/span_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,23 @@ func (c ConstraintsConjunction) String() string {
}
return sb.String()
}

// TestingDefaultSpanConfig exports the default span config for testing purposes.
func TestingDefaultSpanConfig() SpanConfig {
return SpanConfig{
RangeMinBytes: 128 << 20, // 128 MB
RangeMaxBytes: 512 << 20, // 512 MB
// Use 25 hours instead of the previous 24 to make users successful by
// default. Users desiring to take incremental backups every 24h may
// incorrectly assume that the previous default 24h was sufficient to do
// that. But the equation for incremental backups is:
// GC TTLSeconds >= (desired backup interval) + (time to perform incremental backup)
// We think most new users' incremental backups will complete within an
// hour, and larger clusters will have more experienced operators and will
// understand how to change these settings if needed.
GCPolicy: GCPolicy{
TTLSeconds: 25 * 60 * 60,
},
NumReplicas: 3,
}
}
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ go_library(
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/spanconfig/spanconfigkvwatcher",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ func (cfg *Config) readEnvironmentVariables() {
cfg.ScanInterval = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_INTERVAL", cfg.ScanInterval)
cfg.ScanMinIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MIN_IDLE_TIME", cfg.ScanMinIdleTime)
cfg.ScanMaxIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MAX_IDLE_TIME", cfg.ScanMaxIdleTime)
cfg.ExperimentalSpanConfigs = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_SPAN_CONFIGS", cfg.ExperimentalSpanConfigs)
}

// parseGossipBootstrapResolvers parses list of gossip bootstrap resolvers.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,7 @@ func (n *Node) UpdateSpanConfigs(
if !n.storeCfg.SpanConfigsEnabled {
return nil, errors.New("use of span configs disabled")
}

// TODO(irfansharif): We want to protect ourselves from tenants creating
// outlandishly large string buffers here and OOM-ing the host cluster. Is
// the maximum protobuf message size enough of a safeguard?
Expand Down
Loading

0 comments on commit 9372967

Please sign in to comment.