From cbecf621aa520f1463f8e5d678e25312936bbf54 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 24 Aug 2021 13:38:10 -0400 Subject: [PATCH] spanconfig: introduce spanconfig.{KVWatcher,StoreWriter} The KVWatcher is a per-store listener over `system.span_configurations`. It listens in on updates to the global span configurations state, and uses these updates to maintain a (new) in-memory spanconfig.StoreWriter datastructure. This datastructure satisfies the spanconfig.Store interface, and as such can be used to replace the gossiped config.SystemConfig. Release note: None Release justification: non-production code changes --- pkg/BUILD.bazel | 2 + pkg/base/test_server_args.go | 3 + pkg/config/zonepb/zone_test.go | 5 + pkg/keys/constants.go | 9 +- pkg/kv/kvclient/kvcoord/split_test.go | 2 +- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/queue_concurrency_test.go | 2 +- pkg/kv/kvserver/split_queue_test.go | 2 +- pkg/kv/kvserver/store.go | 164 ++++++++- pkg/kv/kvserver/testing_knobs.go | 6 + pkg/roachpb/span_config.go | 20 ++ pkg/server/BUILD.bazel | 2 + pkg/server/server.go | 4 + pkg/spanconfig/spanconfig.go | 125 ++++++- pkg/spanconfig/spanconfigjob/job.go | 4 +- .../spanconfigkvaccessor/kvaccessor.go | 35 +- .../spanconfigkvwatcher/BUILD.bazel | 59 ++++ .../spanconfigkvwatcher/kv_watcher.go | 129 +++++++ .../spanconfigkvwatcher/kv_watcher_test.go | 139 ++++++++ .../spanconfigkvwatcher/main_test.go | 31 ++ .../span_config_decoder.go | 118 +++++++ .../span_config_decoder_test.go | 86 +++++ .../spanconfigmanager/manager_test.go | 4 + pkg/spanconfig/spanconfigstore/BUILD.bazel | 36 ++ pkg/spanconfig/spanconfigstore/shadow.go | 82 +++++ pkg/spanconfig/spanconfigstore/store.go | 289 ++++++++++++++++ pkg/spanconfig/spanconfigstore/store_test.go | 320 ++++++++++++++++++ pkg/spanconfig/spanconfigstore/testdata/basic | 95 ++++++ .../spanconfigstore/testdata/internal | 42 +++ .../spanconfigstore/testdata/overlap | 88 +++++ pkg/sql/catalog/systemschema/system.go | 9 +- pkg/util/interval/btree_based_interval.go | 4 +- .../interval/btree_based_interval_test.go | 4 +- pkg/util/interval/range_group.go | 24 +- 34 files changed, 1871 insertions(+), 74 deletions(-) create mode 100644 pkg/spanconfig/spanconfigkvwatcher/BUILD.bazel create mode 100644 pkg/spanconfig/spanconfigkvwatcher/kv_watcher.go create mode 100644 pkg/spanconfig/spanconfigkvwatcher/kv_watcher_test.go create mode 100644 pkg/spanconfig/spanconfigkvwatcher/main_test.go create mode 100644 pkg/spanconfig/spanconfigkvwatcher/span_config_decoder.go create mode 100644 pkg/spanconfig/spanconfigkvwatcher/span_config_decoder_test.go create mode 100644 pkg/spanconfig/spanconfigstore/BUILD.bazel create mode 100644 pkg/spanconfig/spanconfigstore/shadow.go create mode 100644 pkg/spanconfig/spanconfigstore/store.go create mode 100644 pkg/spanconfig/spanconfigstore/store_test.go create mode 100644 pkg/spanconfig/spanconfigstore/testdata/basic create mode 100644 pkg/spanconfig/spanconfigstore/testdata/internal create mode 100644 pkg/spanconfig/spanconfigstore/testdata/overlap diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 303322b5098e..44a556c6a932 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -179,7 +179,9 @@ ALL_TESTS = [ "//pkg/server:server_test", "//pkg/settings:settings_test", "//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test", + "//pkg/spanconfig/spanconfigkvwatcher:spanconfigkvwatcher_test", "//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test", + "//pkg/spanconfig/spanconfigstore:spanconfigstore_test", "//pkg/sql/catalog/catalogkeys:catalogkeys_test", "//pkg/sql/catalog/catalogkv:catalogkv_test", "//pkg/sql/catalog/catformat:catformat_test", diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 0a76ee9da209..0bdc80e9322e 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -130,6 +130,9 @@ type TestServerArgs struct { // IF set, the demo login endpoint will be enabled. EnableDemoLoginEndpoint bool + // If set, the experimental span configs infrastructure will be enabled. + EnableExperimentalSpanConfigs bool + // If set, a TraceDir is initialized at the provided path. TraceDir string diff --git a/pkg/config/zonepb/zone_test.go b/pkg/config/zonepb/zone_test.go index df9189d33709..2ac12278f5b2 100644 --- a/pkg/config/zonepb/zone_test.go +++ b/pkg/config/zonepb/zone_test.go @@ -1429,3 +1429,8 @@ func TestZoneConfigToSpanConfigConversion(t *testing.T) { require.Equal(t, tc.expectSpanConfig, spanConfig) } } + +func TestDefaultZoneAndSpanConfigs(t *testing.T) { + converted := DefaultZoneConfigRef().AsSpanConfig() + require.True(t, converted.Equal(roachpb.TestingDefaultSpanConfig())) +} diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index fd84110f3750..640cde81332e 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -360,10 +360,11 @@ const ( ZonesTableConfigColumnID = 2 ZonesTableConfigColFamID = 2 - DescriptorTablePrimaryKeyIndexID = 1 - DescriptorTableDescriptorColID = 2 - DescriptorTableDescriptorColFamID = 2 - TenantsTablePrimaryKeyIndexID = 1 + DescriptorTablePrimaryKeyIndexID = 1 + DescriptorTableDescriptorColID = 2 + DescriptorTableDescriptorColFamID = 2 + TenantsTablePrimaryKeyIndexID = 1 + SpanConfigurationsTablePrimaryKeyIndexID = 1 // Reserved IDs for other system tables. Note that some of these IDs refer // to "Ranges" instead of a Table - these IDs are needed to store custom diff --git a/pkg/kv/kvclient/kvcoord/split_test.go b/pkg/kv/kvclient/kvcoord/split_test.go index 9552061812f5..dca4e2759c7f 100644 --- a/pkg/kv/kvclient/kvcoord/split_test.go +++ b/pkg/kv/kvclient/kvcoord/split_test.go @@ -175,7 +175,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) // Override default span config. - cfg := kvserver.TestingDefaultSpanConfig() + cfg := roachpb.TestingDefaultSpanConfig() cfg.RangeMaxBytes = 1 << 18 // Manually create the local test cluster so that the split queue diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 7af30b66a728..85447dc9a19a 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -151,6 +151,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/storage", diff --git a/pkg/kv/kvserver/queue_concurrency_test.go b/pkg/kv/kvserver/queue_concurrency_test.go index ac1ac6ce0cf6..637ab85e1828 100644 --- a/pkg/kv/kvserver/queue_concurrency_test.go +++ b/pkg/kv/kvserver/queue_concurrency_test.go @@ -71,7 +71,7 @@ func TestBaseQueueConcurrent(t *testing.T) { cfg: StoreConfig{ Clock: hlc.NewClock(hlc.UnixNano, time.Second), AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, - DefaultSpanConfig: TestingDefaultSpanConfig(), + DefaultSpanConfig: roachpb.TestingDefaultSpanConfig(), }, } diff --git a/pkg/kv/kvserver/split_queue_test.go b/pkg/kv/kvserver/split_queue_test.go index 889bb76f084f..7b232da6482c 100644 --- a/pkg/kv/kvserver/split_queue_test.go +++ b/pkg/kv/kvserver/split_queue_test.go @@ -89,7 +89,7 @@ func TestSplitQueueShouldQueue(t *testing.T) { repl.mu.Lock() repl.mu.state.Stats = &enginepb.MVCCStats{KeyBytes: test.bytes} repl.mu.Unlock() - conf := TestingDefaultSpanConfig() + conf := roachpb.TestingDefaultSpanConfig() conf.RangeMaxBytes = test.maxBytes repl.SetSpanConfig(conf) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e3401bcb5bf4..e2daaade365a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -215,7 +216,6 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig { st := cluster.MakeTestingClusterSettingsWithVersions(version, version, true) sc := StoreConfig{ DefaultSpanConfig: zonepb.DefaultZoneConfigRef().AsSpanConfig(), - DefaultSystemSpanConfig: zonepb.DefaultSystemZoneConfigRef().AsSpanConfig(), Settings: st, AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, Clock: clock, @@ -649,6 +649,7 @@ type Store struct { computeInitialMetrics sync.Once systemConfigUpdateQueueRateLimiter *quotapool.RateLimiter + spanConfigUpdateQueueRateLimiter *quotapool.RateLimiter } var _ kv.Sender = &Store{} @@ -661,18 +662,17 @@ type StoreConfig struct { AmbientCtx log.AmbientContext base.RaftConfig - DefaultSpanConfig roachpb.SpanConfig - DefaultSystemSpanConfig roachpb.SpanConfig - Settings *cluster.Settings - Clock *hlc.Clock - DB *kv.DB - Gossip *gossip.Gossip - NodeLiveness *liveness.NodeLiveness - StorePool *StorePool - Transport *RaftTransport - NodeDialer *nodedialer.Dialer - RPCContext *rpc.Context - RangeDescriptorCache *rangecache.RangeCache + DefaultSpanConfig roachpb.SpanConfig + Settings *cluster.Settings + Clock *hlc.Clock + DB *kv.DB + Gossip *gossip.Gossip + NodeLiveness *liveness.NodeLiveness + StorePool *StorePool + Transport *RaftTransport + NodeDialer *nodedialer.Dialer + RPCContext *rpc.Context + RangeDescriptorCache *rangecache.RangeCache ClosedTimestampSender *sidetransport.Sender ClosedTimestampReceiver sidetransportReceiver @@ -757,6 +757,10 @@ type StoreConfig struct { // SpanConfigsEnabled determines whether we're able to use the span configs // infrastructure. SpanConfigsEnabled bool + // Used to watch for span configuration changes. + SpanConfigWatcher spanconfig.KVWatcher + // SpanConfigStore is used to store and retrieve span configs. + SpanConfigStore spanconfig.Store } // ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the @@ -1592,6 +1596,44 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback) } + if s.cfg.SpanConfigsEnabled { + // When toggling between the system config span and the span configs + // infrastructure, we want to re-apply all span configs on all + // replicas from whatever the new source is. + spanconfigstore.EnabledSetting.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) { + enabled := spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) + if !enabled { + if s.cfg.Gossip != nil && s.cfg.Gossip.GetSystemConfig() != nil { + s.systemGossipUpdate(s.cfg.Gossip.GetSystemConfig()) + } + + return + } + + now := s.cfg.Clock.NowAsClockTimestamp() + shouldQueue := s.spanConfigUpdateQueueRateLimiter.AdmitN(1) + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + replCtx := repl.AnnotateCtx(ctx) + key := repl.Desc().StartKey + conf, err := s.cfg.SpanConfigStore.GetSpanConfigForKey(replCtx, key) + if err != nil { + log.Fatalf(ctx, "%v", err) + } + + repl.SetSpanConfig(conf) + if shouldQueue { + s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + } + return true // more + }) + }) + } + // Gossip is only ever nil while bootstrapping a cluster and // in unittests. if s.cfg.Gossip != nil { @@ -1630,6 +1672,32 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { }) } + // SpanConfigWatcher is nil unless COCKROACH_EXPERIMENTAL_SPAN_CONFIGS + // is set, and in unit tests. + if s.cfg.SpanConfigsEnabled && s.cfg.SpanConfigWatcher != nil && !s.cfg.TestingKnobs.DisableSpanConfigWatcher { + // Start the watcher process to listen in on span config updates and + // propagate said updates to the underlying store. + spanConfigUpdateC, err := s.cfg.SpanConfigWatcher.WatchForKVUpdates(ctx) + if err != nil { + return err + } + if err := s.stopper.RunAsyncTask(ctx, "spanconfig-watcher", func(context.Context) { + for { + select { + case update := <-spanConfigUpdateC: + if interceptor := s.TestingKnobs().SpanConfigUpdateInterceptor; interceptor != nil { + interceptor(update) + } + s.onSpanConfigUpdate(ctx, update) + case <-s.stopper.ShouldQuiesce(): + return + } + } + }); err != nil { + return err + } + } + if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal { s.startLeaseRenewer(ctx) } @@ -1787,6 +1855,10 @@ func (s *Store) GetConfReader() (spanconfig.StoreReader, error) { return nil, errSysCfgUnavailable } + if s.cfg.SpanConfigsEnabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return spanconfigstore.NewShadowReader(s.cfg.SpanConfigStore, sysCfg), nil + } + return sysCfg, nil } @@ -1968,6 +2040,67 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { }) } +// onSpanConfigUpdate is the callback invoked whenever this store learns of a +// span config update. +func (s *Store) onSpanConfigUpdate(ctx context.Context, update spanconfig.Update) { + if log.ExpensiveLogEnabled(ctx, 1) { + log.Infof(ctx, "received update span=%s conf=%s deleted=%t", update.Span, update.Config.String(), update.Deletion()) + } + + s.cfg.SpanConfigStore.Apply(ctx, update, false) + + if !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return + } + + // We'll want to offer all replicas to the split and merge queues. Be a + // little careful about not spawning too many individual goroutines. + + now := s.cfg.Clock.NowAsClockTimestamp() + shouldQueue := s.spanConfigUpdateQueueRateLimiter.AdmitN(1) + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + replCtx := repl.AnnotateCtx(ctx) + replicaSpan := repl.Desc().RSpan().AsRawSpanWithNoLocals() + + if !update.Span.Overlaps(replicaSpan) { + return true // more + } + + // TODO(irfansharif): It's possible for a config to be applied over an + // entire range when it only pertains to the first half of the range. + // This will be corrected shortly -- we enqueue the range for a split + // below where we then apply the right config on each half. But still, + // it's surprising behavior and gets in the way of a desirable + // consistency guarantee: a key's config at any point in time is one + // that was explicitly declared over it, or the default config. We can + // do better, we can skip applying the config entirely and enqueue the + // split, then relying on the split to install the right configs on each + // half. The current structure is as it is to maintain parity with the + // system config span variant. + key := repl.Desc().StartKey + conf, err := s.cfg.SpanConfigStore.GetSpanConfigForKey(replCtx, key) + if err != nil { + log.Fatalf(replCtx, "%v", err) + } + + repl.SetSpanConfig(conf) + + // TODO(irfansharif): For symmetry with the system config span variant, + // we queue blindly; we could instead only queue it if we knew the + // range's keyspans has a split in there somewhere, or was now part of a + // larger range and eligible for a merge. + if shouldQueue { + s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + } + return true // more + }) +} + func (s *Store) asyncGossipStore(ctx context.Context, reason string, useCached bool) { if err := s.stopper.RunAsyncTask( ctx, fmt.Sprintf("storage.Store: gossip on %s", reason), @@ -2972,8 +3105,3 @@ func min(a, b int) int { } return b } - -// TestingDefaultSpanConfig exposes the default span config for testing purposes. -func TestingDefaultSpanConfig() roachpb.SpanConfig { - return zonepb.DefaultZoneConfigRef().AsSpanConfig() -} diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 786b01a86c98..bb23a4b961ec 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -91,6 +92,8 @@ type StoreTestingKnobs struct { // DisableAutomaticLeaseRenewal enables turning off the background worker // that attempts to automatically renew expiration-based leases. DisableAutomaticLeaseRenewal bool + // DisableSpanConfigWatcher disables the span config watcher process. + DisableSpanConfigWatcher bool // LeaseRequestEvent, if set, is called when replica.requestLeaseLocked() is // called to acquire a new lease. This can be used to assert that a request // triggers a lease acquisition. @@ -327,6 +330,9 @@ type StoreTestingKnobs struct { // PurgeOutdatedReplicasInterceptor intercepts attempts to purge outdated // replicas in the store. PurgeOutdatedReplicasInterceptor func() + // SpanConfigUpdateInterceptor is called after the store hears about a span + // config update. + SpanConfigUpdateInterceptor func(spanconfig.Update) // If set, use the given version as the initial replica version when // bootstrapping ranges. This is used for testing the migration // infrastructure. diff --git a/pkg/roachpb/span_config.go b/pkg/roachpb/span_config.go index 4ab884bc7fd0..13cbb19ec3f3 100644 --- a/pkg/roachpb/span_config.go +++ b/pkg/roachpb/span_config.go @@ -98,3 +98,23 @@ func (c ConstraintsConjunction) String() string { } return sb.String() } + +// TestingDefaultSpanConfig exports the default span config for testing purposes. +func TestingDefaultSpanConfig() SpanConfig { + return SpanConfig{ + RangeMinBytes: 128 << 20, // 128 MB + RangeMaxBytes: 512 << 20, // 512 MB + // Use 25 hours instead of the previous 24 to make users successful by + // default. Users desiring to take incremental backups every 24h may + // incorrectly assume that the previous default 24h was sufficient to do + // that. But the equation for incremental backups is: + // GC TTLSeconds >= (desired backup interval) + (time to perform incremental backup) + // We think most new users' incremental backups will complete within an + // hour, and larger clusters will have more experienced operators and will + // understand how to change these settings if needed. + GCPolicy: GCPolicy{ + TTLSeconds: 25 * 60 * 60, + }, + NumReplicas: 3, + } +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index acfa68099dd0..5536d318e405 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -113,7 +113,9 @@ go_library( "//pkg/spanconfig", "//pkg/spanconfig/spanconfigjob", "//pkg/spanconfig/spanconfigkvaccessor", + "//pkg/spanconfig/spanconfigkvwatcher", "//pkg/spanconfig/spanconfigmanager", + "//pkg/spanconfig/spanconfigstore", "//pkg/sql", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", diff --git a/pkg/server/server.go b/pkg/server/server.go index 7342a99e84d7..445600c0dfd5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -71,6 +71,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" _ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvwatcher" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/contention" @@ -607,6 +609,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { var spanConfigAccessor spanconfig.KVAccessor if cfg.SpanConfigsEnabled { storeCfg.SpanConfigsEnabled = true + storeCfg.SpanConfigWatcher = spanconfigkvwatcher.New(stopper, db, clock, rangeFeedFactory, keys.SpanConfigurationsTableID) + storeCfg.SpanConfigStore = spanconfigstore.New(storeCfg.DefaultSpanConfig) spanConfigAccessor = spanconfigkvaccessor.New( db, internalExecutor, cfg.Settings, systemschema.SpanConfigurationsTableName.FQString(), diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 58997d41d354..cb17b16f6bb4 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -21,7 +21,10 @@ import ( type KVAccessor interface { // GetSpanConfigEntriesFor returns the span configurations that overlap with // the given spans. - GetSpanConfigEntriesFor(ctx context.Context, spans []roachpb.Span) ([]roachpb.SpanConfigEntry, error) + GetSpanConfigEntriesFor( + ctx context.Context, + spans []roachpb.Span, + ) ([]roachpb.SpanConfigEntry, error) // UpdateSpanConfigEntries updates configurations for the given spans. This // is a "targeted" API: the spans being deleted are expected to have been @@ -30,7 +33,16 @@ type KVAccessor interface { // divvying up an existing span into multiple others with distinct configs, // callers are to issue a delete for the previous span and upserts for the // new ones. - UpdateSpanConfigEntries(ctx context.Context, toDelete []roachpb.Span, toUpsert []roachpb.SpanConfigEntry) error + UpdateSpanConfigEntries( + ctx context.Context, + toDelete []roachpb.Span, + toUpsert []roachpb.SpanConfigEntry, + ) error +} + +// KVWatcher emits KV span configuration updates. +type KVWatcher interface { + WatchForKVUpdates(ctx context.Context) (<-chan Update, error) } // ReconciliationDependencies captures what's needed by the span config @@ -47,24 +59,111 @@ type ReconciliationDependencies interface { // through the KVAccessor. } -// Store is a data structure used to store span configs. +// Store is a data structure used to store spans and their corresponding +// configs. type Store interface { StoreReader - - // TODO(irfansharif): We'll want to add a StoreWriter interface here once we - // implement a data structure to store span configs. We expect this data - // structure to be used in KV to eventually replace the use of the - // gossip-backed system config span. + StoreWriter } -// Silence the unused linter. -var _ Store = nil +// StoreWriter is the write-only portion of the Store interface. +type StoreWriter interface { + // Apply applies the given update[1]. It also returns the existing spans that + // were deleted and entries that were newly added to make room for the + // update. The deleted list can double as a list of overlapping spans in the + // Store, provided the update is not a no-op[2]. + // + // Span configs are stored in non-overlapping fashion. When an update + // overlaps with existing configs, they're deleted. If the overlap is only + // partial, the non-overlapping components are re-added. If the update + // itself is adding an entry, that too is added. This is best illustrated + // with the following example: + // + // [--- X --) is a span with config X + // + // Store | [--- A ----)[------------- B -----------)[---------- C -----) + // Update | [------------------ D -------------) + // | + // Deleted | [------------- B -----------)[---------- C -----) + // Added | [------------------ D -------------)[--- C -----) + // Store* | [--- A ----)[------------------ D -------------)[--- C -----) + // + // + // + // TODO(irfansharif): We'll make use of the dryrun option in a future PR + // when wiring up the reconciliation job to use the KVAccessor. Since the + // KVAccessor is a "targeted" API (the spans being deleted/upserted + // have to already be present with the exact same bounds), we'll dryrun an + // update against a StoreWriter (pre-populated with the entries present in + // KV) to generate the targeted deletes and upserts we'd need to issue. + // After successfully installing them in KV, we can keep our StoreWrite + // up-to-date by actually applying the update. + // + // There's also the question of a "full reconciliation pass". We'll be + // generating updates reactively listening in on changes to + // system.{descriptor,zones} (see SQLWatcher). It's possible then for a + // suspended tenant's table history to be GC-ed away and for its SQLWatcher + // to never detect that a certain table/index/partition has been deleted. + // Left as is, this results in us never issuing a corresponding span config + // deletion request. We'd be leaving a bunch of delete-able span configs + // lying around, and a bunch of empty ranges as a result of those. A "full + // reconciliation pass" is our attempt to find all these extraneous entries + // in KV and to delete them. + // + // We can use a StoreWriter here too (one that's pre-populated with the + // contents of KVAccessor, as before). We'd iterate through all descriptors, + // find all overlapping spans and issue KVAccessor deletes for them + upsert + // the descriptor's span config[3]. As for the StoreWriter itself, we'd + // simply delete the overlapping entries. After iterating through all the + // descriptors, we'd finally issue KVAccessor deletes for all span configs + // still remaining in the Store. + // + // TODO(irfansharif): The descriptions above presume holding the entire set + // of span configs in memory, but we could break away from that by adding + // pagination + retrieval limit to the GetSpanConfigEntriesFor API. We'd + // then paginate through chunks of the keyspace at a time, do a "full + // reconciliation pass" over just that chunk, and continue. + // + // [1]: Unless dryrun is true. We'll still generate the same {deleted,added} + // lists. + // [2]: We could instead expose a GetAllOverlapping() API if needed -- would + // make things a bit clearer. + // [3]: We could skip the delete + upsert if it's a no-op, i.e. the + // overlapping span config is the same as the one we'd be looking to + // upsert. Using Apply (dryrun=true) for e.g. would return empty lists, + // indicating a no-op. + Apply(ctx context.Context, update Update, dryrun bool) ( + deleted []roachpb.Span, added []roachpb.SpanConfigEntry, + ) +} -// StoreReader is the read-only portion of the Store interface. It's an adaptor -// interface implemented by config.SystemConfig to let us later swap out the -// source with one backed by a view of `system.span_configurations`. +// StoreReader is the read-only portion of the Store interface. It doubles as an +// adaptor interface for config.SystemConfig. type StoreReader interface { NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey GetSpanConfigForKey(ctx context.Context, key roachpb.RKey) (roachpb.SpanConfig, error) } + +// Update captures what span has seen a config change. It's the unit of what a +// {SQL,KV}Watcher emits, and what can be applied to a StoreWriter. +type Update struct { + // Span captures the key span being updated. + Span roachpb.Span + + // Config captures the span config the key span was updated to. An empty + // config indicates the span config being deleted. + Config roachpb.SpanConfig +} + +// Deletion returns true if the update corresponds to a span config being +// deleted. +func (u Update) Deletion() bool { + return u.Config.IsEmpty() +} + +// Addition returns true if the update corresponds to a span config being +// added. +func (u Update) Addition() bool { + return !u.Deletion() +} diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index c748b70554c4..fa461a0d11cb 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -30,8 +30,8 @@ var _ jobs.Resumer = (*resumer)(nil) func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) error { execCtx := execCtxI.(sql.JobExecContext) rc := execCtx.SpanConfigReconciliationJobDeps() - // TODO(zcfg-pod): Upcoming PRs will actually make use of these reconciliation - // dependencies. + + // TODO(irfansharif): Actually make use of these dependencies. _ = rc <-ctx.Done() diff --git a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go index edb2f22a2589..53d5cb83aa6c 100644 --- a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go +++ b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go @@ -32,23 +32,24 @@ import ( // KVAccessor provides read/write access to all the span configurations for a // CRDB cluster. It's a concrete implementation of the KVAccessor interface. type KVAccessor struct { - db *kv.DB - ie sqlutil.InternalExecutor - settings *cluster.Settings - tableName string // typically system.span_configurations, but overridable for testing purposes + db *kv.DB + ie sqlutil.InternalExecutor + settings *cluster.Settings + + spanConfigurationsTableFQN string // typically 'system.span_configurations', but overridable for testing purposes } var _ spanconfig.KVAccessor = &KVAccessor{} -// New constructs a new Manager. +// New constructs a new KVAccessor. func New( db *kv.DB, ie sqlutil.InternalExecutor, settings *cluster.Settings, tableFQN string, ) *KVAccessor { return &KVAccessor{ - db: db, - ie: ie, - settings: settings, - tableName: tableFQN, + db: db, + ie: ie, + settings: settings, + spanConfigurationsTableFQN: tableFQN, } } @@ -248,9 +249,9 @@ SELECT start_key, end_key, config FROM ( WHERE start_key < $%[2]d ORDER BY start_key DESC LIMIT 1 ) WHERE end_key > $%[2]d `, - k.tableName, // [1] - startKeyIdx+1, // [2] -- prepared statement placeholder (1-indexed) - endKeyIdx+1, // [3] -- prepared statement placeholder (1-indexed) + k.spanConfigurationsTableFQN, // [1] + startKeyIdx+1, // [2] -- prepared statement placeholder (1-indexed) + endKeyIdx+1, // [3] -- prepared statement placeholder (1-indexed) ) } return getStmtBuilder.String(), queryArgs @@ -275,7 +276,7 @@ func (k *KVAccessor) constructDeleteStmtAndArgs(toDelete []roachpb.Span) (string startKeyIdx+1, endKeyIdx+1) // prepared statement placeholders (1-indexed) } deleteStmt := fmt.Sprintf(`DELETE FROM %[1]s WHERE (start_key, end_key) IN (VALUES %[2]s)`, - k.tableName, strings.Join(values, ", ")) + k.spanConfigurationsTableFQN, strings.Join(values, ", ")) return deleteStmt, deleteQueryArgs } @@ -306,7 +307,7 @@ func (k *KVAccessor) constructUpsertStmtAndArgs( startKeyIdx+1, endKeyIdx+1, configIdx+1) // prepared statement placeholders (1-indexed) } upsertStmt := fmt.Sprintf(`UPSERT INTO %[1]s (start_key, end_key, config) VALUES %[2]s`, - k.tableName, strings.Join(upsertValues, ", ")) + k.spanConfigurationsTableFQN, strings.Join(upsertValues, ", ")) return upsertStmt, upsertQueryArgs, nil } @@ -369,9 +370,9 @@ SELECT count(*) = 1 FROM ( ) WHERE end_key > $%[2]d ) `, - k.tableName, // [1] - startKeyIdx+1, // [2] -- prepared statement placeholder (1-indexed) - endKeyIdx+1, // [3] -- prepared statement placeholder (1-indexed) + k.spanConfigurationsTableFQN, // [1] + startKeyIdx+1, // [2] -- prepared statement placeholder (1-indexed) + endKeyIdx+1, // [3] -- prepared statement placeholder (1-indexed) ) } validationStmt := fmt.Sprintf("SELECT true = ALL(%s)", validationInnerStmtBuilder.String()) diff --git a/pkg/spanconfig/spanconfigkvwatcher/BUILD.bazel b/pkg/spanconfig/spanconfigkvwatcher/BUILD.bazel new file mode 100644 index 000000000000..dc98e746a508 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvwatcher/BUILD.bazel @@ -0,0 +1,59 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "spanconfigkvwatcher", + srcs = [ + "kv_watcher.go", + "span_config_decoder.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvwatcher", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/roachpb:with-mocks", + "//pkg/spanconfig", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/row", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/encoding", + "//pkg/util/grpcutil", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/stop", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "spanconfigkvwatcher_test", + srcs = [ + "kv_watcher_test.go", + "main_test.go", + "span_config_decoder_test.go", + ], + deps = [ + ":spanconfigkvwatcher", + "//pkg/base", + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/spanconfig/spanconfigkvaccessor", + "//pkg/sql/sqlutil", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/protoutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/spanconfig/spanconfigkvwatcher/kv_watcher.go b/pkg/spanconfig/spanconfigkvwatcher/kv_watcher.go new file mode 100644 index 000000000000..faa91fa21caa --- /dev/null +++ b/pkg/spanconfig/spanconfigkvwatcher/kv_watcher.go @@ -0,0 +1,129 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvwatcher + +import ( + "context" + "strings" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +// KVWatcher is used to watch for span configuration changes over +// system.span_configurations. +type KVWatcher struct { + stopper *stop.Stopper + db *kv.DB + clock *hlc.Clock + rangeFeedFactory *rangefeed.Factory + + // spanConfigurationsTableID is typically the table ID for + // system.span_configurations, but overridable for testing purposes. + spanConfigurationsTableID uint32 +} + +// New instantiates a KVWatcher. +func New( + stopper *stop.Stopper, + db *kv.DB, + clock *hlc.Clock, + rangeFeedFactory *rangefeed.Factory, + spanConfigurationsTableID uint32, +) *KVWatcher { + return &KVWatcher{ + stopper: stopper, + db: db, + clock: clock, + rangeFeedFactory: rangeFeedFactory, + spanConfigurationsTableID: spanConfigurationsTableID, + } +} + +var _ spanconfig.KVWatcher = &KVWatcher{} + +// WatchForKVUpdates will kick off the KV span config watcher. It establishes a +// rangefeed over the span configurations table, and propagates updates to it +// through the returned channel. +func (w *KVWatcher) WatchForKVUpdates(ctx context.Context) (<-chan spanconfig.Update, error) { + updateCh := make(chan spanconfig.Update) + spanConfigTableStart := keys.SystemSQLCodec.IndexPrefix(w.spanConfigurationsTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID) + spanConfigTableSpan := roachpb.Span{ + Key: spanConfigTableStart, + EndKey: spanConfigTableStart.PrefixEnd(), + } + + rowDecoder := newSpanConfigDecoder() + handleUpdate := func( + ctx context.Context, ev *roachpb.RangeFeedValue, + ) { + deleted := !ev.Value.IsPresent() + var value roachpb.Value + if deleted { + // Since the end key is not part of the primary key, we need to + // decode the previous value in order to determine what it is. + value = ev.PrevValue + } else { + value = ev.Value + } + entry, err := rowDecoder.decode(roachpb.KeyValue{ + Key: ev.Key, + Value: value, + }) + if err != nil { + log.Warningf(ctx, "failed to decode span configuration update: %v", err) + return + } + + if log.ExpensiveLogEnabled(ctx, 1) { + log.Infof(ctx, "received span configuration update for %s (deleted=%t)", entry.Span, deleted) + } + + update := spanconfig.Update{Span: entry.Span} + if !deleted { + update.Config = entry.Config + } + select { + case <-ctx.Done(): + case updateCh <- update: + } + } + + rf, err := w.rangeFeedFactory.RangeFeed( + ctx, "spanconfig-rangefeed", spanConfigTableSpan, w.clock.Now(), handleUpdate, + rangefeed.WithDiff(), + rangefeed.WithInitialScan(nil), + rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) { + // TODO(irfansharif): Consider if there are other errors which we + // want to treat as permanent. This was cargo culted from the + // settings watcher. + if grpcutil.IsAuthError(err) || + strings.Contains(err.Error(), "rpc error: code = Unauthenticated") { + return true + } + return false + }), + ) + if err != nil { + return nil, err + } + w.stopper.AddCloser(rf) + + log.Info(ctx, "established range feed over span configurations table") + return updateCh, nil +} diff --git a/pkg/spanconfig/spanconfigkvwatcher/kv_watcher_test.go b/pkg/spanconfig/spanconfigkvwatcher/kv_watcher_test.go new file mode 100644 index 000000000000..396ebc185207 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvwatcher/kv_watcher_test.go @@ -0,0 +1,139 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvwatcher_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvwatcher" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestKVWatcher verifies that the watcher emits the right updates following +// changes made to the system.span_configurations table. +func TestKVWatcher(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + EnableExperimentalSpanConfigs: true, + }, + }) + defer tc.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + tdb.Exec(t, `SET CLUSTER SETTING spanconfig.kvaccessor_experimental.enabled = true`) + + const dummyTableName = "dummy_span_configurations" + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(t, fmt.Sprintf( + `SELECT table_id from crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + accessor := spanconfigkvaccessor.New( + tc.Server(0).DB(), + tc.Server(0).InternalExecutor().(sqlutil.InternalExecutor), + tc.Server(0).ClusterSettings(), + fmt.Sprintf("defaultdb.public.%s", dummyTableName), + ) + + span := func(start, end string) roachpb.Span { + return roachpb.Span{ + Key: roachpb.Key(start), + EndKey: roachpb.Key(end), + } + } + // ops is a list of operations to execute in order. + ops := []struct { + span roachpb.Span + config, prev roachpb.SpanConfig + delete bool + }{ + { + span: span("a", "c"), + config: roachpb.SpanConfig{}, + delete: false, + }, + { + span: span("d", "f"), + config: roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3}, + delete: false, + }, + { + span: span("d", "f"), + prev: roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3}, + delete: true, + }, + } + + ts := tc.Server(0) + + // initialWatcher is set up to watch the table before any updates are made + // to it. It should observe all changes, in order, including the deletions. + initialWatcher := spanconfigkvwatcher.New(tc.Stopper(), ts.DB(), ts.Clock(), ts.RangeFeedFactory().(*rangefeed.Factory), dummyTableID) + initialUpdateCh, err := initialWatcher.WatchForKVUpdates(ctx) + require.NoError(t, err) + + for _, op := range ops { + if op.delete { + require.NoError(t, accessor.UpdateSpanConfigEntries(ctx, roachpb.Spans{op.span}, nil)) + } else { + update := []roachpb.SpanConfigEntry{{Span: op.span, Config: op.config}} + require.NoError(t, accessor.UpdateSpanConfigEntries(ctx, nil, update)) + } + } + + for _, op := range ops { + select { + case update := <-initialUpdateCh: + require.True(t, update.Span.Equal(op.span)) + require.Equal(t, update.Deletion, op.delete) + if op.delete { + require.True(t, update.Config.Equal(op.prev)) + } else { + require.True(t, update.Config.Equal(op.config)) + } + case <-time.After(5 * time.Second): + t.Errorf("test timed out") + } + } + + // finalWatcher is set up to watch the table after the deletes have + // occurred. It should only observe the final state of the table. + finalWatcher := spanconfigkvwatcher.New(tc.Stopper(), ts.DB(), ts.Clock(), ts.RangeFeedFactory().(*rangefeed.Factory), dummyTableID) + finalUpdateCh, err := finalWatcher.WatchForKVUpdates(ctx) + require.NoError(t, err) + + select { + case update := <-finalUpdateCh: + finalOp := ops[0] + require.True(t, update.Span.Equal(finalOp.span)) + require.True(t, update.Config.Equal(finalOp.config)) + require.False(t, update.Deletion()) + case <-time.After(5 * time.Second): + t.Errorf("test timed out") + } +} diff --git a/pkg/spanconfig/spanconfigkvwatcher/main_test.go b/pkg/spanconfig/spanconfigkvwatcher/main_test.go new file mode 100644 index 000000000000..b7a63f44f445 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvwatcher/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvwatcher_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/spanconfig/spanconfigkvwatcher/span_config_decoder.go b/pkg/spanconfig/spanconfigkvwatcher/span_config_decoder.go new file mode 100644 index 000000000000..962f6045b395 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvwatcher/span_config_decoder.go @@ -0,0 +1,118 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvwatcher + +import ( + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" +) + +// spanConfigDecoder decodes rows from system.span_configurations. It's not +// safe for concurrent use. +type spanConfigDecoder struct { + alloc rowenc.DatumAlloc + colIdxMap catalog.TableColMap +} + +// newSpanConfigDecoder instantiates a spanConfigDecoder. +func newSpanConfigDecoder() *spanConfigDecoder { + return &spanConfigDecoder{ + colIdxMap: row.ColIDtoRowIndexFromCols( + systemschema.SpanConfigurationsTable.PublicColumns(), + ), + } +} + +// decode a span config entry given a KV from the +// system.span_configurations table. +func (sd *spanConfigDecoder) decode(kv roachpb.KeyValue) (entry roachpb.SpanConfigEntry, _ error) { + tbl := systemschema.SpanConfigurationsTable + // First we need to decode the start_key field from the index key. + { + types := []*types.T{tbl.PublicColumns()[0].GetType()} + startKeyRow := make([]rowenc.EncDatum, 1) + _, matches, _, err := rowenc.DecodeIndexKey( + keys.SystemSQLCodec, tbl, tbl.GetPrimaryIndex(), + types, startKeyRow, nil, kv.Key, + ) + if err != nil { + return roachpb.SpanConfigEntry{}, errors.Wrap(err, "failed to decode key") + } + if !matches { + return roachpb.SpanConfigEntry{}, + errors.AssertionFailedf( + "system.span_configurations descriptor does not match key: %v", kv.Key, + ) + } + if err := startKeyRow[0].EnsureDecoded(types[0], &sd.alloc); err != nil { + return roachpb.SpanConfigEntry{}, err + } + entry.Span.Key = []byte(tree.MustBeDBytes(startKeyRow[0].Datum)) + } + if !kv.Value.IsPresent() { + return roachpb.SpanConfigEntry{}, + errors.Errorf("missing value for start key: %s", entry.Span.Key) + } + + // The remaining columns are stored as a family, packed with diff-encoded + // column IDs followed by their values. + { + bytes, err := kv.Value.GetTuple() + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + var colIDDiff uint32 + var lastColID descpb.ColumnID + var res tree.Datum + for len(bytes) > 0 { + _, _, colIDDiff, _, err = encoding.DecodeValueTag(bytes) + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + colID := lastColID + descpb.ColumnID(colIDDiff) + lastColID = colID + if idx, ok := sd.colIdxMap.Get(colID); ok { + res, bytes, err = rowenc.DecodeTableValue(&sd.alloc, tbl.PublicColumns()[idx].GetType(), bytes) + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + + switch colID { + case tbl.PublicColumns()[1].GetID(): // end_key + entry.Span.EndKey = []byte(tree.MustBeDBytes(res)) + case tbl.PublicColumns()[2].GetID(): // config + if err := protoutil.Unmarshal([]byte(tree.MustBeDBytes(res)), &entry.Config); err != nil { + return roachpb.SpanConfigEntry{}, err + } + default: + return roachpb.SpanConfigEntry{}, errors.Errorf("unknown column: %v", colID) + } + } + } + } + + return entry, nil +} + +// TestingDecoderFn exports the decoding routine for testing purposes. +func TestingDecoderFn() func(roachpb.KeyValue) (roachpb.SpanConfigEntry, error) { + return newSpanConfigDecoder().decode +} diff --git a/pkg/spanconfig/spanconfigkvwatcher/span_config_decoder_test.go b/pkg/spanconfig/spanconfigkvwatcher/span_config_decoder_test.go new file mode 100644 index 000000000000..207b3bb320ef --- /dev/null +++ b/pkg/spanconfig/spanconfigkvwatcher/span_config_decoder_test.go @@ -0,0 +1,86 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvwatcher_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvwatcher" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" +) + +// TestSpanConfigDecoder verifies that we can decode rows stored in the +// system.span_configurations table. +func TestSpanConfigDecoder(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + // const dummyTableName = "defaultdb.public.dummy_span_configurations" + const dummyTableName = "dummy_span_configurations" + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(t, fmt.Sprintf( + `SELECT table_id from crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + getCount := func() int { + explain := tdb.Query(t, fmt.Sprintf(`SELECT count(*) FROM %s`, dummyTableName)) + explain.Next() + var c int + require.Nil(t, explain.Scan(&c)) + require.Nil(t, explain.Close()) + return c + } + initialCount := getCount() + + key := tc.ScratchRange(t) + rng := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(key)) + span := rng.Desc().RSpan().AsRawSpanWithNoLocals() + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + + buf, err := protoutil.Marshal(&conf) + require.NoError(t, err) + tdb.Exec(t, fmt.Sprintf(`UPSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`, + dummyTableName), span.Key, span.EndKey, buf) + require.Equal(t, initialCount+1, getCount()) + + k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID) + rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + require.Len(t, rows, initialCount+1) + + last := rows[len(rows)-1] + got, err := spanconfigkvwatcher.TestingDecoderFn()( + roachpb.KeyValue{ + Key: last.Key, + Value: *last.Value, + }, + ) + require.NoError(t, err) + require.Truef(t, span.Equal(got.Span), + "expected span=%s, got span=%s", span, got.Span) + require.Truef(t, conf.Equal(got.Config), + "expected config=%s, got config=%s", conf, got.Config) +} diff --git a/pkg/spanconfig/spanconfigmanager/manager_test.go b/pkg/spanconfig/spanconfigmanager/manager_test.go index 7849796c6709..53b4fc399dac 100644 --- a/pkg/spanconfig/spanconfigmanager/manager_test.go +++ b/pkg/spanconfig/spanconfigmanager/manager_test.go @@ -19,11 +19,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -74,6 +76,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + spanconfigstore.New(roachpb.TestingDefaultSpanConfig()), &spanconfig.TestingKnobs{ ManagerCreatedJobInterceptor: func(jobI interface{}) { job := jobI.(*jobs.Job) @@ -161,6 +164,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + spanconfigstore.New(roachpb.TestingDefaultSpanConfig()), &spanconfig.TestingKnobs{ ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) { require.False(t, exists) diff --git a/pkg/spanconfig/spanconfigstore/BUILD.bazel b/pkg/spanconfig/spanconfigstore/BUILD.bazel new file mode 100644 index 000000000000..4d4923a67c1b --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/BUILD.bazel @@ -0,0 +1,36 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "spanconfigstore", + srcs = [ + "shadow.go", + "store.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/roachpb:with-mocks", + "//pkg/settings", + "//pkg/spanconfig", + "//pkg/util/interval", + "//pkg/util/log", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "spanconfigstore_test", + srcs = ["store_test.go"], + data = glob(["testdata/**"]), + embed = [":spanconfigstore"], + deps = [ + "//pkg/roachpb:with-mocks", + "//pkg/spanconfig", + "//pkg/util/leaktest", + "//pkg/util/randutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/spanconfig/spanconfigstore/shadow.go b/pkg/spanconfig/spanconfigstore/shadow.go new file mode 100644 index 000000000000..da2f48fa3653 --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/shadow.go @@ -0,0 +1,82 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigstore + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// ShadowReader wraps around two spanconfig.StoreReaders and logs warnings when +// there are divergent results from the two. +type ShadowReader struct { + new, old spanconfig.StoreReader +} + +// NewShadowReader instantiates a new shadow reader. +func NewShadowReader(new, old spanconfig.StoreReader) *ShadowReader { + return &ShadowReader{ + new: new, + old: old, + } +} + +var _ spanconfig.StoreReader = &ShadowReader{} + +// NeedsSplit is part of the spanconfig.StoreReader interface. +func (s *ShadowReader) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + new := s.new.NeedsSplit(ctx, start, end) + old := s.old.NeedsSplit(ctx, start, end) + if new != old && log.ExpensiveLogEnabled(ctx, 1) { + log.Warningf(ctx, "needs split: mismatched responses between old(%t) and new(%t) for start=%s end=%s", + old, new, start.String(), end.String()) + } + return new +} + +// ComputeSplitKey is part of the spanconfig.StoreReader interface. +func (s *ShadowReader) ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey { + new := s.new.ComputeSplitKey(ctx, start, end) + old := s.old.ComputeSplitKey(ctx, start, end) + if !new.Equal(old) && log.ExpensiveLogEnabled(ctx, 1) { + str := func(k roachpb.RKey) string { + if len(k) == 0 { + return "" + } + return k.String() + } + + log.Warningf(ctx, "compute split key: mismatched responses between old(%s) and new(%s) for start=%s end=%s", + str(old), str(new), str(start), str(end)) + } + return new +} + +// GetSpanConfigForKey is part of the spanconfig.StoreReader interface. +func (s *ShadowReader) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + new, errNew := s.new.GetSpanConfigForKey(ctx, key) + old, errOld := s.old.GetSpanConfigForKey(ctx, key) + if log.ExpensiveLogEnabled(ctx, 1) { + if !new.Equal(old) { + log.Warningf(ctx, "get span config for key: mismatched responses between old(%s) and new(%s) for key=%s", old.String(), new.String(), key.String()) + } + if !errors.Is(errNew, errOld) { + log.Warningf(ctx, "get span config for key: mismatched errors between old(%s) and new(%s) for key=%s", errOld, errNew, key.String()) + } + } + return new, errNew +} diff --git a/pkg/spanconfig/spanconfigstore/store.go b/pkg/spanconfig/spanconfigstore/store.go new file mode 100644 index 000000000000..99e28d1df726 --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/store.go @@ -0,0 +1,289 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigstore + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/interval" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// EnabledSetting is a hidden cluster setting to enable the use of the span +// configs infrastructure in KV. It switches every each store in the cluster +// from using the gossiped back system config span to instead using the span +// configs infrastructure. It has no effect unless +// COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set. +var EnabledSetting = settings.RegisterBoolSetting( + "spanconfig.experimental_store.enabled", + `use the span config infrastructure in KV instead of the system config span`, + false, +).WithSystemOnly() + +// Store is an in-memory data structure to store and retrieve span configs. +// Internally it makes use of an interval tree to store non-overlapping span +// configs. +type Store struct { + mu struct { + syncutil.RWMutex + tree interval.Tree + idAlloc int64 + } + + // TODO(irfansharif): We're using a static fall back span config here, we + // could instead have this track the host tenant's RANGE DEFAULT config, or + // go a step further and use the tenant's own RANGE DEFAULT instead if the + // key is within the tenant's keyspace. We'd have to thread that through the + // KVAccessor interface by reserving special keys for these default configs. + + // fallback is the span config we'll fall back on in the absence of + // something more specific. + fallback roachpb.SpanConfig +} + +var _ spanconfig.Store = &Store{} + +// New instantiates a span config store with the given fallback. +func New(fallback roachpb.SpanConfig) *Store { + s := &Store{fallback: fallback} + s.mu.tree = interval.NewTree(interval.ExclusiveOverlapper) + return s +} + +// NeedsSplit is part of the spanconfig.StoreReader interface. +func (s *Store) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + return len(s.ComputeSplitKey(ctx, start, end)) > 0 +} + +// ComputeSplitKey is part of the spanconfig.StoreReader interface. +func (s *Store) ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey { + sp := roachpb.Span{Key: start.AsRawKey(), EndKey: end.AsRawKey()} + // We don't want to split within the system config span while we're still + // also using it to disseminate zone configs. + // + // TODO(irfansharif): Once we've fully phased out the system config span, we + // can get rid of this special handling. + if keys.SystemConfigSpan.Contains(sp) { + return nil + } + if keys.SystemConfigSpan.ContainsKey(sp.Key) { + return roachpb.RKey(keys.SystemConfigSpan.EndKey) + } + + s.mu.RLock() + defer s.mu.RUnlock() + + // Iterate over all overlapping ranges, return corresponding span config + // entries. + + idx := 0 + var splitKey roachpb.RKey = nil + s.mu.tree.DoMatching(func(i interval.Interface) (done bool) { + if idx > 0 { + splitKey = roachpb.RKey(i.(storeEntry).Span.Key) + return true // we found our split key, we're done + } + + idx++ + return false // more + }, sp.AsRange()) + + return splitKey +} + +// GetSpanConfigForKey is part of the spanconfig.StoreReader interface. +func (s *Store) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + sp := roachpb.Span{Key: key.AsRawKey(), EndKey: key.Next().AsRawKey()} + + var conf roachpb.SpanConfig + found := false + s.mu.tree.DoMatching(func(i interval.Interface) (done bool) { + conf = i.(storeEntry).Config + found = true + return true + }, sp.AsRange()) + + if !found { + if log.ExpensiveLogEnabled(ctx, 1) { + log.Warningf(ctx, "span config not found for %s", key.String()) + } + conf = s.fallback + } + return conf, nil +} + +// Apply is part of the spanconfig.StoreWriter interface. +func (s *Store) Apply( + ctx context.Context, update spanconfig.Update, dryrun bool, +) (deleted []roachpb.Span, added []roachpb.SpanConfigEntry) { + s.mu.Lock() + defer s.mu.Unlock() + + if !update.Span.Valid() || len(update.Span.EndKey) == 0 { + log.Fatalf(ctx, "invalid span") + } + + entriesToDelete, entriesToAdd := s.accumulateOpsForLocked(update) + + deleted = make([]roachpb.Span, 0, len(entriesToDelete)) + for _, entry := range entriesToDelete { + entry := entry // copy out of loop variable + if !dryrun { + if err := s.mu.tree.Delete(entry, false); err != nil { + log.Fatalf(ctx, "%v", err) + } + } + deleted = append(deleted, entry.Span) + } + + added = make([]roachpb.SpanConfigEntry, 0, len(entriesToAdd)) + for _, entry := range entriesToAdd { + entry := entry // copy out of loop variable + if !dryrun { + if err := s.mu.tree.Insert(entry, false); err != nil { + log.Fatalf(ctx, "%v", err) + } + } + added = append(added, entry.SpanConfigEntry) + } + + return deleted, added +} + +// accumulateOpsForLocked returns the list of store entries that would be +// deleted and added if the given update was to be applied. To apply a given +// update, we want to find all overlapping spans and clear out just the +// intersections. If the update is adding a new span config, we'll also want to +// add it store entry after. We do this by deleting all overlapping spans in +// their entirety and re-adding the non-overlapping portions, if any. +// Pseudo-code: +// +// for entry in store.overlapping(update.span): +// union, intersection = union(update.span, entry), intersection(update.span, entry) +// pre, post = span{union.start_key, intersection.start_key}, span{intersection.end_key, union.end_key} +// +// delete entry +// if entry.contains(update.span.start_key): +// add pre=entry.conf +// if entry.contains(update.span.end_key): +// add post=entry.conf +// +// if adding: +// add update.span=update.conf +// +func (s *Store) accumulateOpsForLocked(update spanconfig.Update) (toDelete, toAdd []storeEntry) { + for _, overlapping := range s.mu.tree.Get(update.Span.AsRange()) { + existing := overlapping.(storeEntry) + var ( + union = existing.Span.Combine(update.Span) + inter = existing.Span.Intersect(update.Span) + + pre = roachpb.Span{Key: union.Key, EndKey: inter.Key} + post = roachpb.Span{Key: inter.EndKey, EndKey: union.EndKey} + ) + + // Delete the existing span in its entirety. Below we'll re-add the + // non-intersecting parts of the span. + toDelete = append(toDelete, existing) + + if existing.Span.ContainsKey(update.Span.Key) { // existing entry contains the update span's start key + // ex: [-----------------) + // + // up: [-------) + // up: [-------------) + // up: [-------------- + // up: [-------) + // up: [-----------------) + // up: [------------------ + + // Re-add the non-intersecting spans, if any. + if pre.Valid() { + toAdd = append(toAdd, s.makeEntryLocked(pre, existing.Config)) + } + } + + if existing.Span.ContainsKey(update.Span.EndKey) { // existing entry contains the update span's end key + // ex: [-----------------) + // + // up: ------------------) + // up: [-----------------) + // up: [-------) + // up: -------------) + // up: [------------) + // up: [---------) + + // Re-add the non-intersecting spans, if any. + if post.Valid() { + toAdd = append(toAdd, s.makeEntryLocked(post, existing.Config)) + } + } + } + + if update.Addition() { + if len(toDelete) == 1 && + toDelete[0].Span.Equal(update.Span) && + toDelete[0].Config.Equal(update.Config) { + // We're deleting exactly what we're going to add, this is a no-op. + return nil, nil + } + + // Add the update itself. + toAdd = append(toAdd, s.makeEntryLocked(update.Span, update.Config)) + + // TODO(irfansharif): If we're adding an entry, we could inspect the + // entries before and after and check whether either of them have the + // same config. If they do, we could coalesce them into a single span. + // Given that these boundaries determine where we split ranges, we'd be + // able to reduce the number of ranges drastically (think adjacent + // tables/indexes/partitions with the same config). This would be + // especially significant for secondary tenants, where we'd be able to + // avoid unconditionally splitting on table boundaries. We'd still want + // to split on tenant boundaries, so certain preconditions would need to + // hold. For performance reasons, we'd probably also want to offer + // a primitive to allow manually splitting on specific table boundaries. + } + + return toDelete, toAdd +} + +func (s *Store) makeEntryLocked(sp roachpb.Span, conf roachpb.SpanConfig) storeEntry { + s.mu.idAlloc++ + return storeEntry{ + SpanConfigEntry: roachpb.SpanConfigEntry{Span: sp, Config: conf}, + id: s.mu.idAlloc, + } +} + +// storeEntry is the type used to store and sort values in the span config +// store. +type storeEntry struct { + roachpb.SpanConfigEntry + id int64 +} + +var _ interval.Interface = storeEntry{} + +// Range implements interval.Interface. +func (s storeEntry) Range() interval.Range { + return s.Span.AsRange() +} + +// ID implements interval.Interface. +func (s storeEntry) ID() uintptr { + return uintptr(s.id) +} diff --git a/pkg/spanconfig/spanconfigstore/store_test.go b/pkg/spanconfig/spanconfigstore/store_test.go new file mode 100644 index 000000000000..edc7e4e0092c --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/store_test.go @@ -0,0 +1,320 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigstore + +import ( + "context" + "fmt" + "math/rand" + "regexp" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +// spanRe matches strings of the form "[start, end)", capturing both the "start" +// and "end" keys. +var spanRe = regexp.MustCompile(`^\[(\w+),\s??(\w+)\)$`) + +// configRe matches a single word. It's a shorthand for declaring a unique +// config. +var configRe = regexp.MustCompile(`^(\w+)$`) + +func TestSpanRe(t *testing.T) { + for _, tc := range []struct { + input string + expMatch bool + expStart, expEnd string + }{ + {"[a, b)", true, "a", "b"}, + {"[acd, bfg)", true, "acd", "bfg"}, // multi character keys allowed + {"[a,b)", true, "a", "b"}, // separating space is optional + {"[ a,b) ", false, "", ""}, // extraneous spaces disallowed + {"[a,b ) ", false, "", ""}, // extraneous spaces disallowed + {"[a,, b)", false, "", ""}, // only single comma allowed + {" [a, b)", false, "", ""}, // need to start with '[' + {"[a,b)x", false, "", ""}, // need to end with ')' + } { + require.Equalf(t, tc.expMatch, spanRe.MatchString(tc.input), "input = %s", tc.input) + if !tc.expMatch { + continue + } + + matches := spanRe.FindStringSubmatch(tc.input) + require.Len(t, matches, 3) + start, end := matches[1], matches[2] + require.Equal(t, tc.expStart, start) + require.Equal(t, tc.expEnd, end) + } +} + +// parseSpan is helper function that constructs a roachpb.Span from a string of +// the form "[start, end)". +func parseSpan(t *testing.T, sp string) roachpb.Span { + if !spanRe.MatchString(sp) { + t.Fatalf("expected %s to match span regex", sp) + } + + matches := spanRe.FindStringSubmatch(sp) + start, end := matches[1], matches[2] + return roachpb.Span{ + Key: roachpb.Key(start), + EndKey: roachpb.Key(end), + } +} + +// parseConfig is helper function that constructs a roachpb.SpanConfig with +// "tagged" with the given string (i.e. a constraint with the given string a +// required key). +func parseConfig(t *testing.T, conf string) roachpb.SpanConfig { + if !configRe.MatchString(conf) { + t.Fatalf("expected %s to match config regex", conf) + } + return roachpb.SpanConfig{ + Constraints: []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + { + Key: conf, + }, + }, + }, + }, + } +} + +// printSpan is a helper function that transforms roachpb.Span into a string of +// the form "[start,end)". The span is assumed to have been constructed by the +// parseSpan helper above. +func printSpan(sp roachpb.Span) string { + return fmt.Sprintf("[%s,%s)", string(sp.Key), string(sp.EndKey)) +} + +// printSpanConfig is a helper function that transforms roachpb.SpanConfig into +// a readable string. The span config is assumed to have been constructed by the +// parseSpanConfig helper above. +func printSpanConfig(conf roachpb.SpanConfig) string { + return conf.Constraints[0].Constraints[0].Key // see parseConfig for what a "tagged" roachpb.SpanConfig translates to +} + +// printSpanConfigEntry is a helper function that transforms +// roachpb.SpanConfigEntry into a string of the form "[start, end):config". The +// span and config are expected to have been constructed using the +// parse{Span,Config} helpers above. +func printSpanConfigEntry(entry roachpb.SpanConfigEntry) string { + return fmt.Sprintf("%s:%s", printSpan(entry.Span), printSpanConfig(entry.Config)) +} + +// TestingGetAllOverlapping is a testing only helper to retrieve the set of +// overlapping entries in sorted order. +func (s *Store) TestingGetAllOverlapping( + _ context.Context, sp roachpb.Span, +) []roachpb.SpanConfigEntry { + s.mu.RLock() + defer s.mu.RUnlock() + + // Iterate over all overlapping ranges and return corresponding span config + // entries. + var res []roachpb.SpanConfigEntry + for _, overlapping := range s.mu.tree.Get(sp.AsRange()) { + res = append(res, overlapping.(storeEntry).SpanConfigEntry) + } + return res +} + +func TestDatadriven(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { + store := New(parseConfig(t, "FALLBACK")) + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + var ( + spanStr, confStr, keyStr string + ) + switch d.Cmd { + case "set": + d.ScanArgs(t, "span", &spanStr) + d.ScanArgs(t, "conf", &confStr) + span, config := parseSpan(t, spanStr), parseConfig(t, confStr) + + dryrun := d.HasArg("dryrun") + deleted, added := store.Apply(ctx, spanconfig.Update{Span: span, Config: config}, dryrun) + + var b strings.Builder + for _, sp := range deleted { + b.WriteString(fmt.Sprintf("deleted %s\n", printSpan(sp))) + } + for _, ent := range added { + b.WriteString(fmt.Sprintf("added %s\n", printSpanConfigEntry(ent))) + } + return b.String() + case "delete": + d.ScanArgs(t, "span", &spanStr) + span := parseSpan(t, spanStr) + + dryrun := d.HasArg("dryrun") + deleted, added := store.Apply(ctx, spanconfig.Update{Span: span}, dryrun) + + var b strings.Builder + for _, sp := range deleted { + b.WriteString(fmt.Sprintf("deleted %s\n", printSpan(sp))) + } + for _, ent := range added { + b.WriteString(fmt.Sprintf("added %s\n", printSpanConfigEntry(ent))) + } + return b.String() + case "get": + d.ScanArgs(t, "key", &keyStr) + config, err := store.GetSpanConfigForKey(ctx, roachpb.RKey(keyStr)) + require.NoError(t, err) + return fmt.Sprintf("conf=%s", printSpanConfig(config)) + + case "needs-split": + d.ScanArgs(t, "span", &spanStr) + span := parseSpan(t, spanStr) + start, end := roachpb.RKey(span.Key), roachpb.RKey(span.EndKey) + result := store.NeedsSplit(ctx, start, end) + return fmt.Sprintf("%t", result) + + case "compute-split": + d.ScanArgs(t, "span", &spanStr) + span := parseSpan(t, spanStr) + start, end := roachpb.RKey(span.Key), roachpb.RKey(span.EndKey) + splitKey := store.ComputeSplitKey(ctx, start, end) + return fmt.Sprintf("key=%s", string(splitKey)) + + case "overlapping": + d.ScanArgs(t, "span", &spanStr) + span := parseSpan(t, spanStr) + entries := store.TestingGetAllOverlapping(ctx, span) + var results []string + for _, entry := range entries { + results = append(results, printSpanConfigEntry(entry)) + } + return strings.Join(results, "\n") + + default: + } + + t.Fatalf("unknown command: %s", d.Cmd) + return "" + }) + }) +} + +// TestRandomized randomly sets/deletes span configs for arbitrary keyspans +// within some alphabet. For a test span, it then asserts that the config we +// retrieve is what we expect to find from the store. It also ensures that all +// ranges are non-overlapping. +func TestRandomized(t *testing.T) { + defer leaktest.AfterTest(t)() + + randutil.SeedForTests() + ctx := context.Background() + alphabet := "abcdefghijklmnopqrstuvwxyz" + configs := "ABCDEF" + ops := []string{"set", "del"} + + genRandomSpan := func() roachpb.Span { + startIdx, endIdx := rand.Intn(len(alphabet)-1), 1+rand.Intn(len(alphabet)-1) + if startIdx == endIdx { + endIdx = (endIdx + 1) % len(alphabet) + } + if endIdx < startIdx { + startIdx, endIdx = endIdx, startIdx + } + spanStr := fmt.Sprintf("[%s, %s)", string(alphabet[startIdx]), string(alphabet[endIdx])) + sp := parseSpan(t, spanStr) + require.True(t, sp.Valid()) + return sp + } + + getRandomConf := func() roachpb.SpanConfig { + confStr := fmt.Sprintf("conf_%s", string(configs[rand.Intn(len(configs))])) + return parseConfig(t, confStr) + } + + getRandomOp := func() string { + return ops[rand.Intn(2)] + } + + testSpan := parseSpan(t, "[f,g)") // pin a single character span to test with + var expConfig roachpb.SpanConfig + var expFound bool + + const numOps = 5000 + store := New(roachpb.TestingDefaultSpanConfig()) + for i := 0; i < numOps; i++ { + sp, conf, op := genRandomSpan(), getRandomConf(), getRandomOp() + switch op { + case "set": + store.Apply(ctx, spanconfig.Update{Span: sp, Config: conf}, false) + if testSpan.Overlaps(sp) { + expConfig, expFound = conf, true + } + case "del": + store.Apply(ctx, spanconfig.Update{Span: sp}, false) + if testSpan.Overlaps(sp) { + expConfig, expFound = roachpb.SpanConfig{}, false + } + default: + t.Fatalf("unexpected op: %s", op) + } + } + + overlappingConfigs := store.TestingGetAllOverlapping(ctx, testSpan) + if !expFound { + require.Len(t, overlappingConfigs, 0) + } else { + // Check to see that the set of overlapping span configs is exactly what + // we expect. + require.Len(t, overlappingConfigs, 1) + gotSpan, gotConfig := overlappingConfigs[0].Span, overlappingConfigs[0].Config + + require.Truef(t, gotSpan.Contains(testSpan), "improper result: expected retrieved span (%s) to contain test span (%s)", + printSpan(gotSpan), printSpan(testSpan)) + require.Truef(t, expConfig.Equal(gotConfig), "mismatched configs: expected %s, got %s", + printSpanConfig(expConfig), printSpanConfig(gotConfig)) + + // Ensure that the config accessed through the StoreReader interface is + // the same as above. + storeReaderConfig, err := store.GetSpanConfigForKey(ctx, roachpb.RKey(testSpan.Key)) + require.NoError(t, err) + require.True(t, gotConfig.Equal(storeReaderConfig)) + } + + var last roachpb.SpanConfigEntry + everythingSpan := parseSpan(t, fmt.Sprintf("[%s,%s)", + string(alphabet[0]), string(alphabet[len(alphabet)-1]))) + for i, cur := range store.TestingGetAllOverlapping(ctx, everythingSpan) { + if i == 0 { + last = cur + continue + } + + // Span configs are returned in strictly sorted order. + require.True(t, last.Span.Key.Compare(cur.Span.Key) < 0, + "expected to find spans in strictly sorted order, found %s then %s", + printSpan(last.Span), printSpan(cur.Span)) + + // Span configs must also be non-overlapping. + require.Falsef(t, last.Span.Overlaps(cur.Span), + "expected non-overlapping spans, found %s and %s", + printSpan(last.Span), printSpan(cur.Span)) + } +} diff --git a/pkg/spanconfig/spanconfigstore/testdata/basic b/pkg/spanconfig/spanconfigstore/testdata/basic new file mode 100644 index 000000000000..2f894d29d056 --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/testdata/basic @@ -0,0 +1,95 @@ +# Test basic get/set/delete operations where the spans retrieved are identical +# to the ones being added/deleted, and are non-overlapping with respect to one +# another. + +# Check that missing keys fallback to a static config. +get key=b +---- +conf=FALLBACK + + +# Test that dryruns don't actually mutate anything. +set span=[b,d) conf=A dryrun +---- +added [b,d):A + +get key=b +---- +conf=FALLBACK + + +# Add span configs for real. +set span=[b,d) conf=A +---- +added [b,d):A + +set span=[f,h) conf=B +---- +added [f,h):B + + +# Check that a no-op operation shows up as much. +set span=[f,h) conf=B +---- + + +# Check that a few keys are as we'd expect. +get key=b +---- +conf=A + +get key=c +---- +conf=A + +get key=f +---- +conf=B + +get key=g +---- +conf=B + +get key=h +---- +conf=FALLBACK + + +# Check that a delete dryrun does nothing. +delete span=[f,h) dryrun +---- +deleted [f,h) + +get key=f +---- +conf=B + + +# Delete a span for real. +delete span=[f,h) +---- +deleted [f,h) + +# Check that a no-op operation does nothing. +delete span=[f,g) +---- + +delete span=[f,h) +---- + +# Check that keys are as we'd expect (including the deleted one). +get key=b +---- +conf=A + +get key=c +---- +conf=A + +get key=f +---- +conf=FALLBACK + +get key=g +---- +conf=FALLBACK diff --git a/pkg/spanconfig/spanconfigstore/testdata/internal b/pkg/spanconfig/spanconfigstore/testdata/internal new file mode 100644 index 000000000000..5cffaf80713c --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/testdata/internal @@ -0,0 +1,42 @@ +# Test the store's internal view of overlapping span configs. + +overlapping span=[a,z) +---- + +set span=[b,d) conf=A +---- +added [b,d):A + +set span=[f,g) conf=B +---- +added [f,g):B + +overlapping span=[b,d) +---- +[b,d):A + +overlapping span=[b,g) +---- +[b,d):A +[f,g):B + +overlapping span=[b,j) +---- +[b,d):A +[f,g):B + +overlapping span=[a,j) +---- +[b,d):A +[f,g):B + +delete span=[f,g) +---- +deleted [f,g) + +overlapping span=[f,g) +---- + +overlapping span=[b,j) +---- +[b,d):A diff --git a/pkg/spanconfig/spanconfigstore/testdata/overlap b/pkg/spanconfig/spanconfigstore/testdata/overlap new file mode 100644 index 000000000000..f9c0ca1b5eaa --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/testdata/overlap @@ -0,0 +1,88 @@ +# Test operations where the spans overlap with the existing ones. + +set span=[b,h) conf=A +---- +added [b,h):A + + +# Check that writing a span with a partial overlap first deletes the existing +# entry and adds three new ones. +set span=[d,f) conf=B +---- +deleted [b,h) +added [b,d):A +added [f,h):A +added [d,f):B + +overlapping span=[b,h) +---- +[b,d):A +[d,f):B +[f,h):A + + +# Check that writing a span that partially overlaps with multiple existing +# entries deletes all of them, and re-adds the right non-overlapping fragments +# with the right configs.. +set span=[c,e) conf=C +---- +deleted [b,d) +deleted [d,f) +added [b,c):A +added [e,f):B +added [c,e):C + +overlapping span=[b,h) +---- +[b,c):A +[c,e):C +[e,f):B +[f,h):A + +# Check that when a span being written to entirely envelopes an existing entry, +# that entry is deleted in its entirety. +delete span=[d,g) +---- +deleted [c,e) +deleted [e,f) +deleted [f,h) +added [c,d):C +added [g,h):A + +overlapping span=[b,h) +---- +[b,c):A +[c,d):C +[g,h):A + +# Validate that the right split points (span start keys) are surfaced. +needs-split span=[b,h) +---- +true + +compute-split span=[b,h) +---- +key=c + +set span=[b,g) conf=A +---- +deleted [b,c) +deleted [c,d) +added [b,g):A + +overlapping span=[b,h) +---- +[b,g):A +[g,h):A + +needs-split span=[b,h) +---- +true + +compute-split span=[b,h) +---- +key=g + +needs-split span=[h,z) +---- +false diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 28e5d3fada3b..545a279f1517 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -2237,7 +2237,14 @@ var ( ColumnIDs: []descpb.ColumnID{1, 2, 3}, }, }, - pk("start_key"), + descpb.IndexDescriptor{ + Name: tabledesc.PrimaryKeyIndexName, + ID: keys.SpanConfigurationsTablePrimaryKeyIndexID, + Unique: true, + KeyColumnNames: []string{"start_key"}, + KeyColumnDirections: singleASC, + KeyColumnIDs: singleID1, + }, ), func(tbl *descpb.TableDescriptor) { tbl.Checks = []*descpb.TableDescriptor_CheckConstraint{{ diff --git a/pkg/util/interval/btree_based_interval.go b/pkg/util/interval/btree_based_interval.go index e8e99c4f9008..a80d8d32369f 100644 --- a/pkg/util/interval/btree_based_interval.go +++ b/pkg/util/interval/btree_based_interval.go @@ -642,7 +642,7 @@ func (n *node) removeFromLeaf(i int, fast bool) (out Interface, shrunk bool) { // For the latter, we have to check: // a) left sibling has node to spare // b) right sibling has node to spare -// c) we must merge +// c) we must Merge // To simplify our code here, we handle cases #1 and #2 the same: // If a node doesn't have enough Interfaces, we make sure it does (using a,b,c). // We then simply redo our remove call, and the second time (regardless of @@ -801,7 +801,7 @@ func (n *node) stealFromRightChild(i int, fast bool) { // +---------- + // func (n *node) mergeWithRightChild(i int, fast bool) { - // merge + // Merge child := n.mutableChild(i) mergeItem := n.items.removeAt(i) mergeChild := n.children.removeAt(i + 1) diff --git a/pkg/util/interval/btree_based_interval_test.go b/pkg/util/interval/btree_based_interval_test.go index bcfe13ce9c06..10e3a9fdcbe9 100644 --- a/pkg/util/interval/btree_based_interval_test.go +++ b/pkg/util/interval/btree_based_interval_test.go @@ -491,9 +491,9 @@ func TestBTree(t *testing.T) { } // TestDeleteAfterRootNodeMerge verifies that delete from a leaf node works -// correctly after a merge which involves the root node. During the delete of a +// correctly after a Merge which involves the root node. During the delete of a // Interface from a leaf node, if the root node has only one Interface and takes -// part of a merge, the root does have any Interface after the merge. The +// part of a Merge, the root does have any Interface after the Merge. The // subsequent adjustment of node range should take this into account. func TestDeleteAfterRootNodeMerge(t *testing.T) { tree := newBTreeWithDegree(InclusiveOverlapper, 2) diff --git a/pkg/util/interval/range_group.go b/pkg/util/interval/range_group.go index 4e8d5f206a94..747a885ba55e 100644 --- a/pkg/util/interval/range_group.go +++ b/pkg/util/interval/range_group.go @@ -65,7 +65,7 @@ type RangeGroup interface { Iterator() RangeGroupIterator // Len returns the number of Ranges currently within the RangeGroup. // This will always be equal to or less than the number of ranges added, - // as ranges that overlap will merge to produce a single larger range. + // as ranges that overlap will Merge to produce a single larger range. Len() int fmt.Stringer } @@ -241,7 +241,7 @@ func (rl *rangeList) insertAtIdx(e *list.Element, n *rangeListNode, r Range, i i // range increases the range of the rangeList, in which case it will be // added to the list, and false if it does not increase the range, in which // case it won't be added. If the range is added, the function will also attempt -// to merge any ranges within the list that now overlap. +// to Merge any ranges within the list that now overlap. func (rl *rangeList) Add(r Range) bool { prev, cur, inCur := rl.findNode(r, true /* inclusive */) @@ -284,10 +284,10 @@ func (rl *rangeList) Add(r Range) bool { // // In this example, n.slots[i] is the first existing range that overlaps // with the new range. - newR := merge(nr, r) + newR := Merge(nr, r) n.slots[i] = newR - // Each iteration attempts to merge all of the ranges in a rangeListNode. + // Each iteration attempts to Merge all of the ranges in a rangeListNode. mergeElem := cur origNode := true for { @@ -300,14 +300,14 @@ func (rl *rangeList) Add(r Range) bool { mergeStart = i + 1 } - // Each iteration attempts to merge a single range into the current - // merge batch. + // Each iteration attempts to Merge a single range into the current + // Merge batch. j := mergeStart for ; j < origLen; j++ { mergeR := mergeN.slots[j] if overlapsInclusive(newR, mergeR) { - newR = merge(newR, mergeR) + newR = Merge(newR, mergeR) n.slots[i] = newR mergeN.len-- rl.len-- @@ -593,7 +593,7 @@ func (rk rangeKey) String() string { // range increases the range of the rangeTree, in which case it will be // added to the tree, and false if it does not increase the range, in which // case it won't be added. If the range is added, the function will also attempt -// to merge any ranges within the tree that now overlap. +// to Merge any ranges within the tree that now overlap. func (rt *rangeTree) Add(r Range) bool { if err := rangeError(r); err != nil { panic(err) @@ -615,10 +615,10 @@ func (rt *rangeTree) Add(r Range) bool { } // Merge as many ranges as possible, and replace old range. - first.r = merge(first.r, r) + first.r = Merge(first.r, r) for _, o := range overlaps[1:] { other := o.(*rangeKey) - first.r = merge(first.r, other.r) + first.r = Merge(first.r, other.r) if err := rt.t.Delete(o, true /* fast */); err != nil { panic(err) } @@ -756,9 +756,9 @@ func contains(out, in Range) bool { return in.Start.Compare(out.Start) >= 0 && out.End.Compare(in.End) >= 0 } -// merge merges the provided ranges together into their union range. The +// Merge merges the provided ranges together into their union range. The // ranges must overlap or the function will not produce the correct output. -func merge(l, r Range) Range { +func Merge(l, r Range) Range { start := l.Start if r.Start.Compare(start) < 0 { start = r.Start