From 3514e3c57dccd7041ea443eef34ca888dffb7d62 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 28 Oct 2021 16:13:44 -0400 Subject: [PATCH] spanconfig: introduce the spanconfig.KVSubscriber KVSubscriber presents a consistent[^1] snapshot of a spanconfig.StoreReader that's incrementally maintained with changes made to the global span configurations state. The maintenance happens transparently; callers can subscribe to learn about what key spans may have seen a configuration change. After learning about a span update, consulting the embedded StoreReader would retrieve an up-to-date[^2] config for it. When a callback is first installed, it's invoked with the [min,max) span -- a shorthand to indicate that callers should consult the StoreReader for all spans of interest. Subsequent updates are of the more incremental kind. It's possible that the span updates received are no-ops, i.e. consulting the StoreReader for the given span would retrieve the last config observed for the span[^2]. type KVSubscriber interface { StoreReader Subscribe(func(updated roachpb.Span)) } Internally we maintain a rangefeed over the global store of span configurations (system.span_configurations), applying updates from it into an embedded spanconfig.Store. A read-only view of this data structure (spanconfig.StoreReader) is exposed as part of the KVSubscriber interface. Rangefeeds used as is don't offer any ordering guarantees with respect to updates made over non-overlapping keys, which is something we care about[^4]. For that reason we make use of a rangefeed buffer, accumulating raw rangefeed updates and flushing them out en-masse in timestamp order when the rangefeed frontier is bumped[^5]. If the buffer overflows (as dictated by the memory limit the KVSubscriber is instantiated with), the subscriber is wound down and an appropriate error is returned to the caller. When running into the errors above, it's safe for the caller to re-subscribe to effectively re-establish the underlying rangefeeds. When re-establishing a new rangefeed and populating a spanconfig.Store using the contents of the initial scan[^6], we wish to preserve the existing spanconfig.StoreReader. Discarding it would entail either blocking all external readers until a new spanconfig.StoreReader was fully populated, or presenting an inconsistent view of the spanconfig.Store that's currently being populated. For new rangefeeds what we do then is route all updates from the initial scan to a fresh spanconfig.Store, and once the initial scan is done, swap at the source for the exported spanconfig.StoreReader. During the initial scan, concurrent readers would continue to observe the last spanconfig.StoreReader if any. After the swap, it would observe the more up-to-date source instead. Future incremental updates will also target the new source. When this source swap occurs, we inform any handlers of the need to possibly refresh their view of all configs. This commit also wires up the KVSubscriber into KV stores, replacing the use of the gossiped system config span (possible given the StoreReader interface, only happens if a testing flag/env var is set). [^1]: The contents of the StoreReader at t1 corresponds exactly to the contents of the global span configuration state at t0 where t0 <= t1. If the StoreReader is read from at t2 where t2 > t1, it's guaranteed to observe a view of the global state at t >= t0. [^2]: For the canonical KVSubscriber implementation, this is typically the closed timestamp target duration. [^3]: The canonical KVSubscriber implementation internally re-establishes feeds when errors occur, possibly re-transmitting earlier updates (usually through a lazy [min,max) span) despite possibly not needing to. We could do a bit better and diff the two data structures, emitting only targeted updates. [^4]: For a given key k, it's config may be stored as part of a larger span S (where S.start <= k < S.end). It's possible for S to get deleted and replaced with sub-spans S1...SN in the same transaction if the span is getting split. When applying these updates, we need to make sure to process the deletion event for S before processing S1...SN. [^5]: In our example above deleting the config for S and adding configs for S1...SN, we want to make sure that we apply the full set of updates all at once -- lest we expose the intermediate state where the config for S was deleted but the configs for S1...SN were not yet applied. [^6]: When tearing down the subscriber due to underlying errors, we could also surface a checkpoint to use the next time the subscriber is established. That way we can avoid the full initial scan over the span configuration state and simply pick up where we left off with our existing spanconfig.Store. Release note: None --- pkg/BUILD.bazel | 1 + pkg/keys/constants.go | 9 +- pkg/keys/spans.go | 3 + pkg/kv/kvclient/rangefeed/rangefeed.go | 6 +- pkg/kv/kvserver/BUILD.bazel | 5 + pkg/kv/kvserver/client_spanconfigs_test.go | 119 +++++ pkg/kv/kvserver/store.go | 159 ++++++- pkg/kv/kvserver/testing_knobs.go | 4 + pkg/server/BUILD.bazel | 1 + pkg/server/server.go | 27 ++ pkg/spanconfig/spanconfig.go | 72 ++- pkg/spanconfig/spanconfigjob/job.go | 4 +- .../spanconfigkvsubscriber/BUILD.bazel | 75 +++ .../spanconfigkvsubscriber/datadriven_test.go | 312 +++++++++++++ .../spanconfigkvsubscriber/kvsubscriber.go | 437 ++++++++++++++++++ .../kvsubscriber_test.go | 18 + .../spanconfigkvsubscriber/main_test.go | 31 ++ .../span_config_decoder.go | 118 +++++ .../span_config_decoder_test.go | 128 +++++ .../spanconfigkvsubscriber/testdata/basic | 48 ++ .../testdata/buffer_overflow | 83 ++++ .../testdata/initial_state | 27 ++ pkg/spanconfig/spanconfigstore/BUILD.bazel | 1 + pkg/spanconfig/spanconfigstore/shadow.go | 4 + pkg/spanconfig/spanconfigstore/store.go | 14 +- pkg/spanconfig/testing_knobs.go | 24 +- pkg/sql/catalog/systemschema/system.go | 9 +- 27 files changed, 1697 insertions(+), 42 deletions(-) create mode 100644 pkg/kv/kvserver/client_spanconfigs_test.go create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/main_test.go create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder_test.go create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/testdata/basic create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/testdata/buffer_overflow create mode 100644 pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 38d9147867af..c04272934912 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -181,6 +181,7 @@ ALL_TESTS = [ "//pkg/server:server_test", "//pkg/settings:settings_test", "//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test", + "//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test", "//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test", "//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test", "//pkg/spanconfig/spanconfigstore:spanconfigstore_test", diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index eaeda96e7a98..35def95cd9e7 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -357,10 +357,11 @@ const ( ZonesTableConfigColumnID = 2 ZonesTableConfigColFamID = 2 - DescriptorTablePrimaryKeyIndexID = 1 - DescriptorTableDescriptorColID = 2 - DescriptorTableDescriptorColFamID = 2 - TenantsTablePrimaryKeyIndexID = 1 + DescriptorTablePrimaryKeyIndexID = 1 + DescriptorTableDescriptorColID = 2 + DescriptorTableDescriptorColFamID = 2 + TenantsTablePrimaryKeyIndexID = 1 + SpanConfigurationsTablePrimaryKeyIndexID = 1 // Reserved IDs for other system tables. Note that some of these IDs refer // to "Ranges" instead of a Table - these IDs are needed to store custom diff --git a/pkg/keys/spans.go b/pkg/keys/spans.go index d611b2ee8a02..02e513ada545 100644 --- a/pkg/keys/spans.go +++ b/pkg/keys/spans.go @@ -13,6 +13,9 @@ package keys import "github.com/cockroachdb/cockroach/pkg/roachpb" var ( + // EverythingSpan is a span that covers everything. + EverythingSpan = roachpb.Span{Key: roachpb.KeyMin, EndKey: roachpb.KeyMax} + // Meta1Span holds all first level addressing records. Meta1Span = roachpb.Span{Key: roachpb.KeyMin, EndKey: Meta2Prefix} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 0ef98e9281ef..ed2e74ac109e 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -193,9 +193,9 @@ func (f *RangeFeed) Start(ctx context.Context) error { return nil } -// Close closes the RangeFeed and waits for it to shut down; it does -// idempotently. It's guaranteed that no future handlers will be invoked after -// this point. +// Close closes the RangeFeed and waits for it to shut down; it does so +// idempotently. It waits for the currently running handler, if any, to complete +// and guarantees that no future handlers will be invoked after this point. func (f *RangeFeed) Close() { f.closeOnce.Do(func() { f.cancel() diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 96aa6e48f6de..5a8171940b37 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -151,6 +151,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/storage", @@ -218,6 +219,7 @@ go_test( "client_replica_backpressure_test.go", "client_replica_gc_test.go", "client_replica_test.go", + "client_spanconfigs_test.go", "client_split_burst_test.go", "client_split_test.go", "client_status_test.go", @@ -340,6 +342,7 @@ go_test( "//pkg/server/telemetry", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", "//pkg/sql", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", @@ -349,6 +352,8 @@ go_test( "//pkg/sql/catalog/tabledesc", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/sql/sqlutil", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/storage/fs", diff --git a/pkg/kv/kvserver/client_spanconfigs_test.go b/pkg/kv/kvserver/client_spanconfigs_test.go new file mode 100644 index 000000000000..74efc495d655 --- /dev/null +++ b/pkg/kv/kvserver/client_spanconfigs_test.go @@ -0,0 +1,119 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestSpanConfigUpdateAppliedToReplica ensures that when a store learns of a +// span config update, it installs the corresponding config on the right +// replica. +func TestSpanConfigUpdateAppliedToReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + + spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig()) + mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore) + + ctx := context.Background() + + args := base.TestServerArgs{ + EnableSpanConfigs: true, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + SpanConfig: &spanconfig.TestingKnobs{ + StoreKVSubscriberOverride: mockSubscriber, + }, + }, + } + s, _, _ := serverutils.StartServer(t, args) + defer s.Stopper().Stop(context.Background()) + + _, err := s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", nil, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + `SET CLUSTER SETTING spanconfig.experimental_store.enabled = true`) + require.NoError(t, err) + + key, err := s.ScratchRange() + require.NoError(t, err) + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + repl := store.LookupReplica(keys.MustAddr(key)) + span := repl.Desc().RSpan().AsRawSpanWithNoLocals() + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + + deleted, added := spanConfigStore.Apply(ctx, spanconfig.Update{Span: span, Config: conf}, false /* dryrun */) + require.Empty(t, deleted) + require.Len(t, added, 1) + require.True(t, added[0].Span.Equal(span)) + require.True(t, added[0].Config.Equal(conf)) + + require.NotNil(t, mockSubscriber.callback) + mockSubscriber.callback(span) // invoke the callback + testutils.SucceedsSoon(t, func() error { + repl := store.LookupReplica(keys.MustAddr(key)) + gotConfig := repl.SpanConfig() + if !gotConfig.Equal(conf) { + return errors.Newf("expected config=%s, got config=%s", conf.String(), gotConfig.String()) + } + return nil + }) +} + +func newMockSpanConfigSubscriber(store spanconfig.Store) *mockSpanConfigSubscriber { + return &mockSpanConfigSubscriber{Store: store} +} + +type mockSpanConfigSubscriber struct { + callback func(config roachpb.Span) + spanconfig.Store +} + +func (m *mockSpanConfigSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + return m.Store.NeedsSplit(ctx, start, end) +} + +func (m *mockSpanConfigSubscriber) ComputeSplitKey( + ctx context.Context, start, end roachpb.RKey, +) roachpb.RKey { + return m.Store.ComputeSplitKey(ctx, start, end) +} + +func (m *mockSpanConfigSubscriber) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + return m.Store.GetSpanConfigForKey(ctx, key) +} + +func (m *mockSpanConfigSubscriber) Subscribe(callback func(roachpb.Span)) { + m.callback = callback +} + +var _ spanconfig.KVSubscriber = &mockSpanConfigSubscriber{} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index c3019eb04ab1..9217b8fa2272 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -217,7 +218,6 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig { tracer := tracing.NewTracerWithOpt(context.TODO(), tracing.WithClusterSettings(&st.SV)) sc := StoreConfig{ DefaultSpanConfig: zonepb.DefaultZoneConfigRef().AsSpanConfig(), - DefaultSystemSpanConfig: zonepb.DefaultSystemZoneConfigRef().AsSpanConfig(), Settings: st, AmbientCtx: log.AmbientContext{Tracer: tracer}, Clock: clock, @@ -651,6 +651,7 @@ type Store struct { computeInitialMetrics sync.Once systemConfigUpdateQueueRateLimiter *quotapool.RateLimiter + spanConfigUpdateQueueRateLimiter *quotapool.RateLimiter } var _ kv.Sender = &Store{} @@ -663,18 +664,17 @@ type StoreConfig struct { AmbientCtx log.AmbientContext base.RaftConfig - DefaultSpanConfig roachpb.SpanConfig - DefaultSystemSpanConfig roachpb.SpanConfig - Settings *cluster.Settings - Clock *hlc.Clock - DB *kv.DB - Gossip *gossip.Gossip - NodeLiveness *liveness.NodeLiveness - StorePool *StorePool - Transport *RaftTransport - NodeDialer *nodedialer.Dialer - RPCContext *rpc.Context - RangeDescriptorCache *rangecache.RangeCache + DefaultSpanConfig roachpb.SpanConfig + Settings *cluster.Settings + Clock *hlc.Clock + DB *kv.DB + Gossip *gossip.Gossip + NodeLiveness *liveness.NodeLiveness + StorePool *StorePool + Transport *RaftTransport + NodeDialer *nodedialer.Dialer + RPCContext *rpc.Context + RangeDescriptorCache *rangecache.RangeCache ClosedTimestampSender *sidetransport.Sender ClosedTimestampReceiver sidetransportReceiver @@ -759,6 +759,10 @@ type StoreConfig struct { // SpanConfigsEnabled determines whether we're able to use the span configs // infrastructure. SpanConfigsEnabled bool + // Used to subscribe to span configuration changes, keeping up-to-date a + // data structure useful for retrieving span configs. Only available if + // SpanConfigsEnabled. + SpanConfigSubscriber spanconfig.KVSubscriber // KVAdmissionController is an optional field used for admission control. KVAdmissionController KVAdmissionController @@ -1635,6 +1639,26 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { }) } + if s.cfg.SpanConfigsEnabled { + s.cfg.SpanConfigSubscriber.Subscribe(func(update roachpb.Span) { + s.onSpanConfigUpdate(ctx, update) + }) + + // When toggling between the system config span and the span configs + // infrastructure, we want to re-apply configs on all replicas from + // whatever the new source is. + spanconfigstore.EnabledSetting.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) { + enabled := spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) + if enabled { + s.applyAllFromSpanConfigStore(ctx) + } else { + if s.cfg.Gossip != nil && s.cfg.Gossip.GetSystemConfig() != nil { + s.systemGossipUpdate(s.cfg.Gossip.GetSystemConfig()) + } + } + }) + } + if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal { s.startLeaseRenewer(ctx) } @@ -1776,6 +1800,10 @@ func (s *Store) GetConfReader() (spanconfig.StoreReader, error) { return nil, errSysCfgUnavailable } + if s.cfg.SpanConfigsEnabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return spanconfigstore.NewShadowReader(s.cfg.SpanConfigSubscriber, sysCfg), nil + } + return sysCfg, nil } @@ -1930,11 +1958,11 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { // We'll want to offer all replicas to the split and merge queues. Be a little // careful about not spawning too many individual goroutines. + shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1) // For every range, update its zone config and check if it needs to // be split or merged. now := s.cfg.Clock.NowAsClockTimestamp() - shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1) newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { key := repl.Desc().StartKey conf, err := sysCfg.GetSpanConfigForKey(ctx, key) @@ -1957,6 +1985,109 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { }) } +// onSpanConfigUpdate is the callback invoked whenever this store learns of a +// span config update. +func (s *Store) onSpanConfigUpdate(ctx context.Context, updated roachpb.Span) { + if !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return + } + + sp, err := keys.SpanAddr(updated) + if err != nil { + log.Errorf(ctx, "skipped applying update, unexpected error resolving span address: %v", err) + return + } + + now := s.cfg.Clock.NowAsClockTimestamp() + if err := s.mu.replicasByKey.VisitKeyRange(ctx, sp.Key, sp.EndKey, AscendingKeyOrder, + func(ctx context.Context, it replicaOrPlaceholder) error { + repl := it.repl + if repl == nil { + return nil // placeholder; ignore + } + + startKey := repl.Desc().StartKey + if !updated.ContainsKey(startKey.AsRawKey()) { + // It's possible that the update we're receiving here is the + // right-hand side of a span config getting split. Think of + // installing a zone config on some partition of an index where + // previously there was none on any of the partitions. The range + // spanning the entire index would have to split on the + // partition boundary, and before it does so, it's possible that + // it would receive a span config update for just the partition. + // + // To avoid clobbering the pre-split range's embedded span + // config with the partition's config, we'll ensure that the + // range's start key is part of the update. We don't have to + // enqueue the range in the split queue here, that takes place + // when processing the left-hand side span config update. + + return nil // ignore + } + + // TODO(irfansharif): It's possible for a config to be applied over an + // entire range when it only pertains to the first half of the range. + // This will be corrected shortly -- we enqueue the range for a split + // below where we then apply the right config on each half. But still, + // it's surprising behavior and gets in the way of a desirable + // consistency guarantee: a key's config at any point in time is one + // that was explicitly declared over it, or the default config. + // + // We can do better, we can skip applying the config entirely and + // enqueue the split, then relying on the split trigger to install + // the right configs on each half. The current structure is as it is + // to maintain parity with the system config span variant. + + replCtx := repl.AnnotateCtx(ctx) + conf, err := s.cfg.SpanConfigSubscriber.GetSpanConfigForKey(replCtx, startKey) + if err != nil { + log.Errorf(ctx, "skipped applying update, unexpected error reading from subscriber: %v", err) + return err + } + repl.SetSpanConfig(conf) + + // TODO(irfansharif): For symmetry with the system config span variant, + // we queue blindly; we could instead only queue it if we knew the + // range's keyspans has a split in there somewhere, or was now part of a + // larger range and eligible for a merge. + s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + return nil // more + }, + ); err != nil { + // Errors here should not be possible, but if there is one, log loudly. + log.Errorf(ctx, "unexpected error visiting replicas: %v", err) + } +} + +// applyAllFromSpanConfigStore applies, on each replica, span configs from the +// embedded span config store. +func (s *Store) applyAllFromSpanConfigStore(ctx context.Context) { + now := s.cfg.Clock.NowAsClockTimestamp() + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + replCtx := repl.AnnotateCtx(ctx) + key := repl.Desc().StartKey + conf, err := s.cfg.SpanConfigSubscriber.GetSpanConfigForKey(replCtx, key) + if err != nil { + log.Errorf(ctx, "skipped applying config update, unexpected error reading from subscriber: %v", err) + return true // more + } + + repl.SetSpanConfig(conf) + s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + return true // more + }) +} + func (s *Store) asyncGossipStore(ctx context.Context, reason string, useCached bool) { if err := s.stopper.RunAsyncTask( ctx, fmt.Sprintf("storage.Store: gossip on %s", reason), diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 786b01a86c98..eba3f2c6028d 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -327,6 +328,9 @@ type StoreTestingKnobs struct { // PurgeOutdatedReplicasInterceptor intercepts attempts to purge outdated // replicas in the store. PurgeOutdatedReplicasInterceptor func() + // SpanConfigUpdateInterceptor is called after the store hears about a span + // config update. + SpanConfigUpdateInterceptor func(spanconfig.Update) // If set, use the given version as the initial replica version when // bootstrapping ranges. This is used for testing the migration // infrastructure. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 867dce5b597f..80ac4fd11f22 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -113,6 +113,7 @@ go_library( "//pkg/spanconfig", "//pkg/spanconfig/spanconfigjob", "//pkg/spanconfig/spanconfigkvaccessor", + "//pkg/spanconfig/spanconfigkvsubscriber", "//pkg/spanconfig/spanconfigmanager", "//pkg/spanconfig/spanconfigsqltranslator", "//pkg/sql", diff --git a/pkg/server/server.go b/pkg/server/server.go index d805f5e54517..480db62f0864 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -71,6 +71,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" _ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/contention" @@ -177,6 +178,8 @@ type Server struct { protectedtsProvider protectedts.Provider protectedtsReconciler *ptreconcile.Reconciler + spanConfigSubscriber *spanconfigkvsubscriber.KVSubscriber + sqlServer *SQLServer drainSleepFn func(time.Duration) @@ -612,8 +615,26 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } var spanConfigAccessor spanconfig.KVAccessor + var spanConfigSubscriber *spanconfigkvsubscriber.KVSubscriber if cfg.SpanConfigsEnabled { storeCfg.SpanConfigsEnabled = true + spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) + if spanConfigKnobs != nil && spanConfigKnobs.StoreKVSubscriberOverride != nil { + storeCfg.SpanConfigSubscriber = spanConfigKnobs.StoreKVSubscriberOverride + } else { + spanConfigSubscriber = spanconfigkvsubscriber.New( + stopper, + db, + clock, + rangeFeedFactory, + keys.SpanConfigurationsTableID, + 1<<20, /* 1 MB */ + storeCfg.DefaultSpanConfig, + spanConfigKnobs, + ) + storeCfg.SpanConfigSubscriber = spanConfigSubscriber + } + spanConfigAccessor = spanconfigkvaccessor.New( db, internalExecutor, cfg.Settings, systemschema.SpanConfigurationsTableName.FQString(), @@ -808,6 +829,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { replicationReporter: replicationReporter, protectedtsProvider: protectedtsProvider, protectedtsReconciler: protectedtsReconciler, + spanConfigSubscriber: spanConfigSubscriber, sqlServer: sqlServer, drainSleepFn: drainSleepFn, externalStorageBuilder: externalStorageBuilder, @@ -1726,6 +1748,11 @@ func (s *Server) PreStart(ctx context.Context) error { return err } + if s.cfg.SpanConfigsEnabled && s.spanConfigSubscriber != nil { + if err := s.spanConfigSubscriber.Start(ctx); err != nil { + return err + } + } // Start garbage collecting system events. // // NB: As written, this falls awkwardly between SQL and KV. KV is used only diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 06cb134ef1cd..f5ef9f204979 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -24,7 +24,10 @@ import ( type KVAccessor interface { // GetSpanConfigEntriesFor returns the span configurations that overlap with // the given spans. - GetSpanConfigEntriesFor(ctx context.Context, spans []roachpb.Span) ([]roachpb.SpanConfigEntry, error) + GetSpanConfigEntriesFor( + ctx context.Context, + spans []roachpb.Span, + ) ([]roachpb.SpanConfigEntry, error) // UpdateSpanConfigEntries updates configurations for the given spans. This // is a "targeted" API: the spans being deleted are expected to have been @@ -33,35 +36,72 @@ type KVAccessor interface { // divvying up an existing span into multiple others with distinct configs, // callers are to issue a delete for the previous span and upserts for the // new ones. - UpdateSpanConfigEntries(ctx context.Context, toDelete []roachpb.Span, toUpsert []roachpb.SpanConfigEntry) error + UpdateSpanConfigEntries( + ctx context.Context, + toDelete []roachpb.Span, + toUpsert []roachpb.SpanConfigEntry, + ) error +} + +// KVSubscriber presents a consistent[1] snapshot of a StoreReader that's +// incrementally maintained with changes made to the global span configurations +// state (system.span_configurations). The maintenance happens transparently; +// callers can subscribe to learn about what key spans may have seen a +// configuration change. After learning about a span update through a callback +// invocation, callers can consult the embedded StoreReader to retrieve an +// up-to-date[2] config for the updated span. The callback is called in a single +// goroutine; it should avoid doing any long-running or blocking work. +// +// When a callback is first installed, it's invoked with the [min,max) span -- +// a shorthand to indicate that callers should consult the StoreReader for all +// spans of interest. Subsequent updates are of the more incremental kind. It's +// possible that the span updates received are no-ops, i.e. consulting the +// StoreReader for the given span would still retrieve the last config observed +// for the span[3]. +// +// [1]: The contents of the StoreReader at t1 corresponds exactly to the +// contents of the global span configuration state at t0 where t0 <= t1. If +// the StoreReader is read from at t2 where t2 > t1, it's guaranteed to +// observe a view of the global state at t >= t0. +// [2]: For the canonical KVSubscriber implementation, this is typically lagging +// by the closed timestamp target duration. +// [3]: The canonical KVSubscriber implementation is bounced whenever errors +// occur, which may result in the re-transmission of earlier updates +// (typically through a coarsely targeted [min,max) span). +type KVSubscriber interface { + StoreReader + Subscribe(func(updated roachpb.Span)) } // SQLTranslator translates SQL descriptors and their corresponding zone // configurations to constituent spans and span configurations. // // Concretely, for the following zone configuration hierarchy: +// // CREATE DATABASE db; // CREATE TABLE db.t1(); // ALTER DATABASE db CONFIGURE ZONE USING num_replicas=7; // ALTER TABLE db.t1 CONFIGURE ZONE USING num_voters=5; +// // The SQLTranslator produces the following translation (represented as a diff // against RANGE DEFAULT for brevity): +// // Table/5{3-4} num_replicas=7 num_voters=5 type SQLTranslator interface { - // Translate generates the implied span configuration state given a list of // Translate generates the span configuration state given a list of - // {descriptor, named zone} IDs. No entry is returned for an ID if it doesn't - // exist or has been dropped. The timestamp at which the translation is valid - // is also returned. + // {descriptor, named zone} IDs. No entry is returned for an ID if it + // doesn't exist or if it's dropped. The timestamp at which the translation + // is valid is also returned. // - // For every ID we first descend the zone configuration hierarchy with the ID - // as the root to accumulate IDs of all leaf objects. Leaf objects are tables - // and named zones (other than RANGE DEFAULT) which have actual span - // configurations associated with them (as opposed to non-leaf nodes that only - // serve to hold zone configurations for inheritance purposes). Then, for - // for every one of these accumulated IDs, we generate - // tuples by following up the inheritance chain to fully hydrate the span - // configuration. Translate also accounts for and negotiates subzone spans. + // For every ID we first descend the zone configuration hierarchy with the + // ID as the root to accumulate IDs of all leaf objects. Leaf objects are + // tables and named zones (other than RANGE DEFAULT) which have actual span + // configurations associated with them (as opposed to non-leaf nodes that + // only serve to hold zone configurations for inheritance purposes). Then, + // for each one of these accumulated IDs, we generate tuples by following up the inheritance chain to fully hydrate the + // span configuration. Translate also accounts for and negotiates subzone + // spans. Translate(ctx context.Context, ids descpb.IDs) ([]roachpb.SpanConfigEntry, hlc.Timestamp, error) } @@ -178,8 +218,8 @@ type StoreReader interface { GetSpanConfigForKey(ctx context.Context, key roachpb.RKey) (roachpb.SpanConfig, error) } -// Update captures what span has seen a config change. It will be the unit of -// what a {SQL,KV}Watcher emits, and what can be applied to a StoreWriter. +// Update captures a span and the corresponding config change. It's the unit of +// what can be applied to a StoreWriter. type Update struct { // Span captures the key span being updated. Span roachpb.Span diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index c748b70554c4..fa461a0d11cb 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -30,8 +30,8 @@ var _ jobs.Resumer = (*resumer)(nil) func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) error { execCtx := execCtxI.(sql.JobExecContext) rc := execCtx.SpanConfigReconciliationJobDeps() - // TODO(zcfg-pod): Upcoming PRs will actually make use of these reconciliation - // dependencies. + + // TODO(irfansharif): Actually make use of these dependencies. _ = rc <-ctx.Done() diff --git a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel new file mode 100644 index 000000000000..37c3ba4b4ca7 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel @@ -0,0 +1,75 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "spanconfigkvsubscriber", + srcs = [ + "kvsubscriber.go", + "span_config_decoder.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", + "//pkg/roachpb:with-mocks", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/row", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/encoding", + "//pkg/util/grpcutil", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/retry", + "//pkg/util/stop", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "spanconfigkvsubscriber_test", + srcs = [ + "datadriven_test.go", + "kvsubscriber_test.go", + "main_test.go", + "span_config_decoder_test.go", + ], + data = glob(["testdata/**"]), + embed = [":spanconfigkvsubscriber"], + deps = [ + "//pkg/base", + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigkvaccessor", + "//pkg/spanconfig/spanconfigtestutils", + "//pkg/sql/sqlutil", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go b/pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go new file mode 100644 index 000000000000..59cfe1e0e1c4 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go @@ -0,0 +1,312 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber_test + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestDataDriven runs datadriven tests against the KVSubscriber interface. +// The syntax is as follows: +// +// update +// delete [c,e) +// upsert [c,d):C +// upsert [d,e):D +// ---- +// ok +// +// get +// span [a,b) +// span [b,c) +// ---- +// [a,b):A +// [b,d):B +// +// start +// ---- +// +// updates +// ---- +// [a,b) +// [b,d) +// [e,f) +// +// store-reader key=b +// ---- +// [b,d):B +// +// store-reader compute-split=[a,c) +// ---- +// b +// +// store-reader needs-split=[b,h) +// ---- +// true +// +// inject-buffer-overflow +// ---- +// ok +// +// - update and get tie into GetSpanConfigEntriesFor and +// UpdateSpanConfigEntries respectively on the KVAccessor interface, and are a +// convenient shorthand to populate the system table that the KVSubscriber +// subscribes to. The input is processed in a single batch. +// - start starts the subscription process. It can also be used to verify +// behavior when re-establishing subscriptions after hard errors. +// - updates lists the span updates the KVSubscriber receives, in the listed +// order. Updates in a batch are de-duped. +// - store-reader {key,compute-split,needs-split} relate to GetSpanConfigForKey, +// ComputeSplitKey and NeedsSplit respectively on the StoreReader subset of the +// KVSubscriber interface. +// - inject-buffer-overflow can be used to inject rangefeed buffer overflow +// errors within the kvsubscriber. It pokes into the internals of the +// kvsubscriber and is useful to test teardown and recovery behavior. +// +// Text of the form [a,b) and [a,b):C correspond to spans and span config +// entries; see spanconfigtestutils.Parse{Span,Config,SpanConfigEntry} for more +// details. +func TestDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() + + datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + EnableSpanConfigs: true, + }, + }) + defer cancel() + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + tdb.Exec(t, `SET CLUSTER SETTING spanconfig.experimental_kvaccessor.enabled = true`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + + const dummyTableName = "dummy_span_configurations" + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(t, fmt.Sprintf( + `SELECT table_id from crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + kvAccessor := spanconfigkvaccessor.New( + tc.Server(0).DB(), + tc.Server(0).InternalExecutor().(sqlutil.InternalExecutor), + tc.Server(0).ClusterSettings(), + fmt.Sprintf("defaultdb.public.%s", dummyTableName), + ) + + mu := struct { + syncutil.Mutex + lastFrontierTS hlc.Timestamp // serializes updates and update + subscriberRunning bool // serializes updates, subscribe, and inject-buffer-overflow + receivedUpdates roachpb.Spans + }{} + injectedErrCh := make(chan error) + + kvSubscriber := spanconfigkvsubscriber.New( + ts.Stopper(), + ts.DB(), + ts.Clock(), + ts.RangeFeedFactory().(*rangefeed.Factory), + dummyTableID, + 10<<20, /* 10 MB */ + spanconfigtestutils.ParseConfig(t, "MISSING"), + &spanconfig.TestingKnobs{ + KVSubscriberOnTimestampAdvanceInterceptor: func(ts hlc.Timestamp) { + mu.Lock() + defer mu.Unlock() + mu.lastFrontierTS = ts + }, + KVSubscriberPostRangefeedStartInterceptor: func() { + mu.Lock() + defer mu.Unlock() + mu.subscriberRunning = true + }, + KVSubscriberPreExitInterceptor: func() { + mu.Lock() + defer mu.Unlock() + mu.subscriberRunning = false + }, + KVSubscriberErrorInjectionCh: injectedErrCh, + }, + ) + + kvSubscriber.Subscribe(func(span roachpb.Span) { + mu.Lock() + defer mu.Unlock() + mu.receivedUpdates = append(mu.receivedUpdates, span) + }) + + var lastUpdateTS hlc.Timestamp + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "get": + spans := spanconfigtestutils.ParseKVAccessorGetArguments(t, d.Input) + entries, err := kvAccessor.GetSpanConfigEntriesFor(ctx, spans) + require.NoError(t, err) + + var output strings.Builder + for _, entry := range entries { + output.WriteString(fmt.Sprintf("%s\n", spanconfigtestutils.PrintSpanConfigEntry(entry))) + } + return output.String() + + case "update": + toDelete, toUpsert := spanconfigtestutils.ParseKVAccessorUpdateArguments(t, d.Input) + require.NoError(t, kvAccessor.UpdateSpanConfigEntries(ctx, toDelete, toUpsert)) + lastUpdateTS = ts.Clock().Now() + + case "start": + mu.Lock() + require.False(t, mu.subscriberRunning, "subscriber already running") + mu.Unlock() + + go func() { + _ = kvSubscriber.TestingRunInner(ctx) + }() + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + if !mu.subscriberRunning { + return errors.New("expected subscriber to have started") + } + return nil + }) + + case "updates": + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + + if !mu.subscriberRunning { + // The subscriber isn't running, we're not expecting any + // frontier bumps. + return nil + } + + // The subscriber is running -- we should be observing + // frontier bumps. In order to serialize after the last + // kvaccessor-update, lets wait until the frontier timestamp + // is past it. + if lastUpdateTS.LessEq(mu.lastFrontierTS) { + return nil + } + + return errors.Newf("frontier timestamp (%s) lagging last update (%s)", + mu.lastFrontierTS.String(), lastUpdateTS.String()) + }) // TODO(irfansharif): We could use a tighter bound here, but it's unreliable under stress. + + mu.Lock() + receivedUpdates := mu.receivedUpdates + mu.receivedUpdates = mu.receivedUpdates[:0] // clear out buffer + mu.Unlock() + + var output strings.Builder + sort.Sort(receivedUpdates) + for i, update := range receivedUpdates { + if i != 0 && receivedUpdates[i].Equal(receivedUpdates[i-1]) { + continue // de-dup updates + } + + var spanStr string + if update.Equal(keys.EverythingSpan) { + spanStr = update.String() + } else { + spanStr = spanconfigtestutils.PrintSpan(update) + } + output.WriteString(fmt.Sprintf("%s\n", spanStr)) + } + + return output.String() + + case "inject-buffer-overflow": + injectedErrCh <- rangefeedbuffer.ErrBufferLimitExceeded + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + if mu.subscriberRunning { + return errors.New("expected subscriber to have stopped") + } + return nil + + }) + + case "store-reader": + if len(d.CmdArgs) != 1 { + d.Fatalf(t, "unexpected number of args (%d), expected 1", len(d.CmdArgs)) + } + cmdArg := d.CmdArgs[0] + + switch cmdArg.Key { + case "key": + var keyStr string + d.ScanArgs(t, cmdArg.Key, &keyStr) + config, err := kvSubscriber.GetSpanConfigForKey(ctx, roachpb.RKey(keyStr)) + require.NoError(t, err) + return fmt.Sprintf("conf=%s", spanconfigtestutils.PrintSpanConfig(config)) + + case "compute-split": + var spanStr string + d.ScanArgs(t, cmdArg.Key, &spanStr) + span := spanconfigtestutils.ParseSpan(t, spanStr) + start, end := roachpb.RKey(span.Key), roachpb.RKey(span.EndKey) + splitKey := kvSubscriber.ComputeSplitKey(ctx, start, end) + return string(splitKey) + + case "needs-split": + var spanStr string + d.ScanArgs(t, cmdArg.Key, &spanStr) + span := spanconfigtestutils.ParseSpan(t, spanStr) + start, end := roachpb.RKey(span.Key), roachpb.RKey(span.EndKey) + result := kvSubscriber.NeedsSplit(ctx, start, end) + return fmt.Sprintf("%t", result) + + default: + t.Fatalf("unknown argument: %s", cmdArg.Key) + } + + default: + t.Fatalf("unknown command: %s", d.Cmd) + } + return "" + }) + }) +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go new file mode 100644 index 000000000000..a59c4318af18 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -0,0 +1,437 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber + +import ( + "context" + "strings" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// KVSubscriber is used to subscribe to global span configuration changes. It's +// a concrete implementation of the spanconfig.KVSubscriber interface. +// +// Internally we maintain a rangefeed over the global store of span +// configurations (system.span_configurations), applying updates from it into an +// internal spanconfig.Store. A read-only view of this data structure +// (spanconfig.StoreReader) is exposed as part of the KVSubscriber interface. +// Rangefeeds used as is don't offer any ordering guarantees with respect to +// updates made over non-overlapping keys, which is something we care about[1]. +// For that reason we make use of a rangefeed buffer, accumulating raw rangefeed +// updates and flushing them out en-masse in timestamp order when the rangefeed +// frontier is bumped[2]. If the buffer overflows (as dictated by the memory +// limit the KVSubscriber is instantiated with), the subscriber is wound down and +// an appropriate error is returned to the caller. +// +// When running into the errors above, it's safe for the caller to re-subscribe +// to effectively re-establish the underlying rangefeeds. When re-establishing a +// new rangefeed and populating a spanconfig.Store using the contents of the +// initial scan[3], we wish to preserve the existing spanconfig.StoreReader. +// Discarding it would entail either blocking all external readers until a new +// spanconfig.StoreReader was fully populated, or presenting an inconsistent +// view of the spanconfig.Store that's currently being populated. For new +// rangefeeds what we do then is route all updates from the initial scan to a +// fresh spanconfig.Store, and once the initial scan is done, swap at the +// source for the exported spanconfig.StoreReader. During the initial scan, +// concurrent readers would continue to observe the last spanconfig.StoreReader +// if any. After the swap, it would observe the more up-to-date source instead. +// Future incremental updates will also target the new source. When this source +// swap occurs, we inform the handler of the need to possibly refresh its view +// of all configs. +// +// TODO(irfansharif): When swapping the old spanconfig.StoreReader for the new, +// instead of informing callers with an everything [min,max) span, we could diff +// the two data structures and only emit targeted updates. +// +// [1]: For a given key k, it's config may be stored as part of a larger span S +// (where S.start <= k < S.end). It's possible for S to get deleted and +// replaced with sub-spans S1...SN in the same transaction if the span is +// getting split. When applying these updates, we need to make sure to +// process the deletion event for S before processing S1...SN. +// [2]: In our example above deleting the config for S and adding configs for S1...Nwe +// want to make sure that we apply the full set of updates all at once -- +// lest we expose the intermediate state where the config for S was deleted +// but the configs for S1...SN were not yet applied. +// [3]: TODO(irfansharif): When tearing down the subscriber due to underlying errors, +// we could also surface a checkpoint to use the next time the subscriber is +// established. That way we can avoid the full initial scan over the span +// configuration state and simply pick up where we left off with our existing +// spanconfig.Store. +// +type KVSubscriber struct { + stopper *stop.Stopper + db *kv.DB + clock *hlc.Clock + rangefeedFactory *rangefeed.Factory + decoder *spanConfigDecoder + spanConfigTableSpan roachpb.Span // typically system.span_configurations, but overridable for tests + bufferMemLimit int64 + fallback roachpb.SpanConfig + knobs *spanconfig.TestingKnobs + + subscribed int32 // accessed atomically + mu struct { // serializes between Start and external threads + syncutil.RWMutex + // internal is the internal spanconfig.Store maintained by the + // KVSubscriber. A read-only view over this store is exposed as part of + // the interface. When re-subscribing, a fresh spanconfig.Store is + // populated while the exposed spanconfig.StoreReader appears static. + // Once sufficiently caught up, the fresh spanconfig.Store is swapped in + // and the old discarded. See type-level comment for more details. + internal spanconfig.Store + handlers []handler + } + + lastFrontierTS hlc.Timestamp // used to assert monotonicity across subscription attempts +} + +var _ spanconfig.KVSubscriber = &KVSubscriber{} + +// spanConfigurationsTableRowSize is an estimate of the size of a single row in +// the system.span_configurations table (size of start/end key, and size of a +// marshaled span config proto). The value used here was pulled out of thin air +// -- it only serves to coarsely limit how large the KVSubscriber's underlying +// rangefeed buffer can get. +const spanConfigurationsTableRowSize = 5 << 10 // 5 KB + +// New instantiates a KVSubscriber. +func New( + stopper *stop.Stopper, + db *kv.DB, + clock *hlc.Clock, + rangeFeedFactory *rangefeed.Factory, + spanConfigurationsTableID uint32, + bufferMemLimit int64, + fallback roachpb.SpanConfig, + knobs *spanconfig.TestingKnobs, +) *KVSubscriber { + spanConfigTableStart := keys.SystemSQLCodec.IndexPrefix( + spanConfigurationsTableID, + keys.SpanConfigurationsTablePrimaryKeyIndexID, + ) + spanConfigTableSpan := roachpb.Span{ + Key: spanConfigTableStart, + EndKey: spanConfigTableStart.PrefixEnd(), + } + spanConfigStore := spanconfigstore.New(fallback) + if knobs == nil { + knobs = &spanconfig.TestingKnobs{} + } + s := &KVSubscriber{ + stopper: stopper, + db: db, + clock: clock, + bufferMemLimit: bufferMemLimit, + rangefeedFactory: rangeFeedFactory, + spanConfigTableSpan: spanConfigTableSpan, + fallback: fallback, + knobs: knobs, + decoder: newSpanConfigDecoder(), + } + s.mu.internal = spanConfigStore + return s +} + +// Start establishes a subscription (internally: rangefeed) over the global +// store of span configs. It fires off an async task to do so, re-establishing +// internally when retryable errors[1] occur and stopping only when the surround +// stopper is quiescing or the context canceled. All installed handlers are +// invoked in the single async task thread. +// +// [1]: It's possible for retryable errors to occur internally, at which point +// we tear down the existing subscription and re-establish another. When +// unsubscribed, the exposed spanconfig.StoreReader continues to be +// readable (though no longer incrementally maintained -- the view gets +// progressively staler overtime). Existing handlers are kept intact and +// notified when the subscription is re-established. After re-subscribing, +// the exported StoreReader will be up-to-date and continue to be +// incrementally maintained. +func (s *KVSubscriber) Start(ctx context.Context) error { + return s.stopper.RunAsyncTask(ctx, "spanconfig-kvsubscriber", func(ctx context.Context) { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() + + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { + if err := s.run(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return // we're done here + } + + log.Warningf(ctx, "spanconfig-kvsubscriber failed with %v, retrying...", err) + continue + } + + return // we're done here + } + }) +} + +// run establishes a rangefeed over the global store of span configs. +// This is a blocking operation, returning (and unsubscribing) only when the +// surrounding stopper is stopped, the context canceled, or when a retryable +// error occurs. For the latter, it's expected that callers will re-run the +// subscriber. +func (s *KVSubscriber) run(ctx context.Context) error { + if !atomic.CompareAndSwapInt32(&s.subscribed, 0, 1) { + log.Fatal(ctx, "currently subscribed: only allowed once at any point in time") + } + if fn := s.knobs.KVSubscriberPreExitInterceptor; fn != nil { + defer fn() + } + defer func() { atomic.StoreInt32(&s.subscribed, 0) }() + + buffer := rangefeedbuffer.New(int(s.bufferMemLimit / spanConfigurationsTableRowSize)) + frontierBumpedCh, initialScanDoneCh, errCh := make(chan struct{}), make(chan struct{}), make(chan error) + mu := struct { // serializes access between the rangefeed and the main thread here + syncutil.Mutex + frontierTS hlc.Timestamp + }{} + + defer func() { + mu.Lock() + s.lastFrontierTS = mu.frontierTS + mu.Unlock() + }() + + onValue := func(ctx context.Context, ev *roachpb.RangeFeedValue) { + deleted := !ev.Value.IsPresent() + var value roachpb.Value + if deleted { + if !ev.PrevValue.IsPresent() { + // It's possible to write a KV tombstone on top of another KV + // tombstone -- both the new and old value will be empty. We simply + // ignore these events. + return + } + + // Since the end key is not part of the primary key, we need to + // decode the previous value in order to determine what it is. + value = ev.PrevValue + } else { + value = ev.Value + } + entry, err := s.decoder.decode(roachpb.KeyValue{ + Key: ev.Key, + Value: value, + }) + if err != nil { + log.Fatalf(ctx, "failed to decode row: %v", err) // non-retryable error; just fatal + } + + if log.ExpensiveLogEnabled(ctx, 1) { + log.Infof(ctx, "received span configuration update for %s (deleted=%t)", entry.Span, deleted) + } + + update := spanconfig.Update{Span: entry.Span} + if !deleted { + update.Config = entry.Config + } + + if err := buffer.Add(ctx, &bufferEvent{update, ev.Value.Timestamp}); err != nil { + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is closed by the + // main handler goroutine. It's closed after we stop listening + // to errCh. + case errCh <- err: + } + } + } + + initialScanTS := s.clock.Now() + if initialScanTS.Less(s.lastFrontierTS) { + log.Fatalf(ctx, "initial scan timestamp (%s) regressed from last recorded frontier (%s)", initialScanTS, s.lastFrontierTS) + } + + rangeFeed := s.rangefeedFactory.New("spanconfig-rangefeed", s.spanConfigTableSpan, initialScanTS, + onValue, + rangefeed.WithInitialScan(func(ctx context.Context) { + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is closed by the + // main handler goroutine. It's closed after we stop listening + // to initialScanDoneCh. + case initialScanDoneCh <- struct{}{}: + } + }), + rangefeed.WithOnFrontierAdvance(func(ctx context.Context, frontierTS hlc.Timestamp) { + mu.Lock() + mu.frontierTS = frontierTS + mu.Unlock() + + select { + case <-ctx.Done(): + case frontierBumpedCh <- struct{}{}: + } + }), + rangefeed.WithDiff(), + rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) { + // TODO(irfansharif): Consider if there are other errors which we + // want to treat as permanent. This was cargo culted from the + // settings watcher. + if grpcutil.IsAuthError(err) || + strings.Contains(err.Error(), "rpc error: code = Unauthenticated") { + return true + } + return false + }), + ) + if err := rangeFeed.Start(ctx); err != nil { + return err + } + defer rangeFeed.Close() + if fn := s.knobs.KVSubscriberPostRangefeedStartInterceptor; fn != nil { + fn() + } + + log.Info(ctx, "established range feed over span configurations table") + + injectedErrCh := s.knobs.KVSubscriberErrorInjectionCh + + for { + select { + case <-s.stopper.ShouldQuiesce(): + return nil + case <-ctx.Done(): + return ctx.Err() + case <-frontierBumpedCh: + mu.Lock() + frontierTS := mu.frontierTS + mu.Unlock() + + events := buffer.Flush(ctx, frontierTS) + s.mu.Lock() + for _, ev := range events { + s.mu.internal.Apply(ctx, ev.(*bufferEvent).Update, false /* dryrun */) + } + handlers := s.mu.handlers + s.mu.Unlock() + + for _, h := range handlers { + for _, ev := range events { + h.invoke(ev.(*bufferEvent).Update.Span) + } + } + + if fn := s.knobs.KVSubscriberOnTimestampAdvanceInterceptor; fn != nil { + fn(frontierTS) + } + case <-initialScanDoneCh: + events := buffer.Flush(ctx, initialScanTS) + freshStore := spanconfigstore.New(s.fallback) + for _, ev := range events { + freshStore.Apply(ctx, ev.(*bufferEvent).Update, false /* dryrun */) + } + + s.mu.Lock() + s.mu.internal = freshStore + handlers := s.mu.handlers + s.mu.Unlock() + + for _, h := range handlers { + // When re-establishing a rangefeed, it's possible we have a + // spanconfig.Store with arbitrary updates from what was + // exported last. Let's inform the handler than everything needs + // to be checked again. + h.invoke(keys.EverythingSpan) + } + + if fn := s.knobs.KVSubscriberOnTimestampAdvanceInterceptor; fn != nil { + fn(initialScanTS) + } + case err := <-errCh: + return err + case err := <-injectedErrCh: + return err + } + } +} + +// Subscribe installs a callback that's invoked with whatever span may have seen +// a config update. +func (s *KVSubscriber) Subscribe(fn func(roachpb.Span)) { + s.mu.Lock() + defer s.mu.Unlock() + + s.mu.handlers = append(s.mu.handlers, handler{fn: fn}) +} + +// NeedsSplit is part of the spanconfig.KVSubscriber interface. +func (s *KVSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.mu.internal.NeedsSplit(ctx, start, end) +} + +// ComputeSplitKey is part of the spanconfig.KVSubscriber interface. +func (s *KVSubscriber) ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.mu.internal.ComputeSplitKey(ctx, start, end) +} + +// GetSpanConfigForKey is part of the spanconfig.KVSubscriber interface. +func (s *KVSubscriber) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.mu.internal.GetSpanConfigForKey(ctx, key) +} + +type handler struct { + initialized bool // tracks whether we need to invoke with a [min,max) span first + fn func(update roachpb.Span) +} + +func (h handler) invoke(update roachpb.Span) { + if !h.initialized { + h.fn(keys.EverythingSpan) + h.initialized = true + + if update.Equal(keys.EverythingSpan) { + return // we can opportunistically avoid re-invoking with the same update + } + } + + h.fn(update) +} + +type bufferEvent struct { + spanconfig.Update + ts hlc.Timestamp +} + +// Timestamp implements the rangefeedbuffer.Event interface. +func (w *bufferEvent) Timestamp() hlc.Timestamp { + return w.ts +} + +var _ rangefeedbuffer.Event = &bufferEvent{} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go new file mode 100644 index 000000000000..d5ba0fe16ab2 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go @@ -0,0 +1,18 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber + +import "context" + +// TestingRunInner exports the inner run method for testing purposes. +func (s *KVSubscriber) TestingRunInner(ctx context.Context) error { + return s.run(ctx) +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/main_test.go b/pkg/spanconfig/spanconfigkvsubscriber/main_test.go new file mode 100644 index 000000000000..177f2e83fa44 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go new file mode 100644 index 000000000000..76fa92e1f7a8 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go @@ -0,0 +1,118 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber + +import ( + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" +) + +// spanConfigDecoder decodes rows from system.span_configurations. It's not +// safe for concurrent use. +type spanConfigDecoder struct { + alloc rowenc.DatumAlloc + colIdxMap catalog.TableColMap +} + +// newSpanConfigDecoder instantiates a spanConfigDecoder. +func newSpanConfigDecoder() *spanConfigDecoder { + return &spanConfigDecoder{ + colIdxMap: row.ColIDtoRowIndexFromCols( + systemschema.SpanConfigurationsTable.PublicColumns(), + ), + } +} + +// decode a span config entry given a KV from the +// system.span_configurations table. +func (sd *spanConfigDecoder) decode(kv roachpb.KeyValue) (entry roachpb.SpanConfigEntry, _ error) { + tbl := systemschema.SpanConfigurationsTable + // First we need to decode the start_key field from the index key. + { + types := []*types.T{tbl.PublicColumns()[0].GetType()} + startKeyRow := make([]rowenc.EncDatum, 1) + _, matches, _, err := rowenc.DecodeIndexKey( + keys.SystemSQLCodec, tbl, tbl.GetPrimaryIndex(), + types, startKeyRow, nil, kv.Key, + ) + if err != nil { + return roachpb.SpanConfigEntry{}, errors.Wrapf(err, "failed to decode key: %v", kv.Key) + } + if !matches { + return roachpb.SpanConfigEntry{}, + errors.AssertionFailedf( + "system.span_configurations descriptor does not match key: %v", kv.Key, + ) + } + if err := startKeyRow[0].EnsureDecoded(types[0], &sd.alloc); err != nil { + return roachpb.SpanConfigEntry{}, err + } + entry.Span.Key = []byte(tree.MustBeDBytes(startKeyRow[0].Datum)) + } + if !kv.Value.IsPresent() { + return roachpb.SpanConfigEntry{}, + errors.AssertionFailedf("missing value for start key: %s", entry.Span.Key) + } + + // The remaining columns are stored as a family, packed with diff-encoded + // column IDs followed by their values. + { + bytes, err := kv.Value.GetTuple() + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + var colIDDiff uint32 + var lastColID descpb.ColumnID + var res tree.Datum + for len(bytes) > 0 { + _, _, colIDDiff, _, err = encoding.DecodeValueTag(bytes) + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + colID := lastColID + descpb.ColumnID(colIDDiff) + lastColID = colID + if idx, ok := sd.colIdxMap.Get(colID); ok { + res, bytes, err = rowenc.DecodeTableValue(&sd.alloc, tbl.PublicColumns()[idx].GetType(), bytes) + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + + switch colID { + case tbl.PublicColumns()[1].GetID(): // end_key + entry.Span.EndKey = []byte(tree.MustBeDBytes(res)) + case tbl.PublicColumns()[2].GetID(): // config + if err := protoutil.Unmarshal([]byte(tree.MustBeDBytes(res)), &entry.Config); err != nil { + return roachpb.SpanConfigEntry{}, err + } + default: + return roachpb.SpanConfigEntry{}, errors.AssertionFailedf("unknown column: %v", colID) + } + } + } + } + + return entry, nil +} + +// TestingDecoderFn exports the decoding routine for testing purposes. +func TestingDecoderFn() func(roachpb.KeyValue) (roachpb.SpanConfigEntry, error) { + return newSpanConfigDecoder().decode +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder_test.go b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder_test.go new file mode 100644 index 000000000000..18e56e4b704a --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder_test.go @@ -0,0 +1,128 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" +) + +// TestSpanConfigDecoder verifies that we can decode rows stored in the +// system.span_configurations table. +func TestSpanConfigDecoder(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + const dummyTableName = "dummy_span_configurations" + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(t, fmt.Sprintf( + `SELECT table_id FROM crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + getCount := func() int { + q := tdb.Query(t, fmt.Sprintf(`SELECT count(*) FROM %s`, dummyTableName)) + q.Next() + var c int + require.Nil(t, q.Scan(&c)) + require.Nil(t, q.Close()) + return c + } + initialCount := getCount() + + key := tc.ScratchRange(t) + rng := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(key)) + span := rng.Desc().RSpan().AsRawSpanWithNoLocals() + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + + buf, err := protoutil.Marshal(&conf) + require.NoError(t, err) + tdb.Exec(t, fmt.Sprintf(`UPSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`, + dummyTableName), span.Key, span.EndKey, buf) + require.Equal(t, initialCount+1, getCount()) + + k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID) + rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + require.Len(t, rows, initialCount+1) + + last := rows[len(rows)-1] + got, err := spanconfigkvsubscriber.TestingDecoderFn()( + roachpb.KeyValue{ + Key: last.Key, + Value: *last.Value, + }, + ) + require.NoError(t, err) + require.Truef(t, span.Equal(got.Span), + "expected span=%s, got span=%s", span, got.Span) + require.Truef(t, conf.Equal(got.Config), + "expected config=%s, got config=%s", conf, got.Config) +} + +func BenchmarkSpanConfigDecoder(b *testing.B) { + defer log.Scope(b).Close(b) + + s, db, _ := serverutils.StartServer( + b, base.TestServerArgs{UseDatabase: "bench"}) + defer s.Stopper().Stop(context.Background()) + + ctx := context.Background() + const dummyTableName = "dummy_span_configurations" + tdb := sqlutils.MakeSQLRunner(db) + + tdb.Exec(b, `CREATE DATABASE bench`) + tdb.Exec(b, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(b, fmt.Sprintf( + `SELECT table_id from crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + buf, err := protoutil.Marshal(&conf) + require.NoError(b, err) + + tdb.Exec(b, fmt.Sprintf(`UPSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`, + dummyTableName), roachpb.Key("a"), roachpb.Key("b"), buf) + + k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID) + rows, err := s.DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(b, err) + last := rows[len(rows)-1] + decoderFn := spanconfigkvsubscriber.TestingDecoderFn() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = decoderFn(roachpb.KeyValue{ + Key: last.Key, + Value: *last.Value, + }) + } +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/testdata/basic b/pkg/spanconfig/spanconfigkvsubscriber/testdata/basic new file mode 100644 index 000000000000..676a8d5aa502 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/testdata/basic @@ -0,0 +1,48 @@ +# Test the basic control flow: writes to the span configurations table should +# appear to the subscriber. Incremental updates should as well, and the exposed +# store reader reflecting said updates appropriately. + +start +---- + +update +upsert [a,c):A +upsert [d,f):D +---- + +get +span [a,f) +---- +[a,c):A +[d,f):D + +updates +---- +/M{in-ax} +[a,c) +[d,f) + +store-reader key=a +---- +conf=A + +store-reader key=d +---- +conf=D + +store-reader compute-split=[a,e) +---- +d + +update +delete [d,f) +---- + +updates +---- +/M{in-ax} +[d,f) + +store-reader key=d +---- +conf=MISSING diff --git a/pkg/spanconfig/spanconfigkvsubscriber/testdata/buffer_overflow b/pkg/spanconfig/spanconfigkvsubscriber/testdata/buffer_overflow new file mode 100644 index 000000000000..fa7787f911e7 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/testdata/buffer_overflow @@ -0,0 +1,83 @@ +# Test the behavior of the kvsubscriber in the presence of internal +# subscription errors. During errors, the store-reader should present a snapshot +# view of the state before the error occurred. It should also be safe to +# bounce the same subscriber and have the handlers observe a [max,min) update +# indicating their view of all span configs needs to be refreshed. When +# consulting the store-reader after, it should observe a more up-to-date +# snapshot than earlier. We should also continue to observe incremental updates +# there-on-forth. + +start +---- + +update +upsert [a,c):A +upsert [d,f):D +---- + +updates +---- +/M{in-ax} +[a,c) +[d,f) + +store-reader key=a +---- +conf=A + +store-reader key=d +---- +conf=D + +# Inject a hard error. Subsequent updates aren't observed by the subscriber. The +# store-reader should also still be readable and present a snapshot of the state +# pre-error. +inject-buffer-overflow +---- + +update +upsert [a,c):B +delete [d,f) +---- + +updates +---- + +store-reader key=a +---- +conf=A + +store-reader key=d +---- +conf=D + +# Bounce the kvsubscriber. We should observe a catch-all update as a result, and +# observe a more up-to-date snapshot of the span configuration state. We should +# also receive incremental updates. +start +---- + +updates +---- +/M{in-ax} + +store-reader key=a +---- +conf=B + +store-reader key=d +---- +conf=MISSING + +update +upsert [a,c):C +---- + +updates +---- +/M{in-ax} +[a,c) + +store-reader key=a +---- +conf=C diff --git a/pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state b/pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state new file mode 100644 index 000000000000..5437421e235c --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state @@ -0,0 +1,27 @@ +# Ensure that subscribers started after certain span configs have been +# deleted/overwritten never observe earlier state. + +update +upsert [a,c):A +upsert [d,f):D +---- + +update +delete [d,f) +upsert [a,c):B +---- + +start +---- + +updates +---- +/M{in-ax} + +store-reader key=a +---- +conf=B + +store-reader key=d +---- +conf=MISSING diff --git a/pkg/spanconfig/spanconfigstore/BUILD.bazel b/pkg/spanconfig/spanconfigstore/BUILD.bazel index 5d6524640d87..a17a28eade92 100644 --- a/pkg/spanconfig/spanconfigstore/BUILD.bazel +++ b/pkg/spanconfig/spanconfigstore/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/roachpb:with-mocks", + "//pkg/settings", "//pkg/spanconfig", "//pkg/util/interval", "//pkg/util/log", diff --git a/pkg/spanconfig/spanconfigstore/shadow.go b/pkg/spanconfig/spanconfigstore/shadow.go index c9625f19fdeb..98a0b042318c 100644 --- a/pkg/spanconfig/spanconfigstore/shadow.go +++ b/pkg/spanconfig/spanconfigstore/shadow.go @@ -21,6 +21,10 @@ import ( // ShadowReader wraps around two spanconfig.StoreReaders and logs warnings (if // expensive logging is enabled) when there are divergent results from the two. +// +// TODO(irfansharif): This was added as a convenient way to diagnose the +// differences between span configs infrastructure and system config span. +// Remove it when we actually start issuing RPCs (#71994) have better tests. type ShadowReader struct { new, old spanconfig.StoreReader } diff --git a/pkg/spanconfig/spanconfigstore/store.go b/pkg/spanconfig/spanconfigstore/store.go index 0f3d357fd7c3..b5eb8bd50c5b 100644 --- a/pkg/spanconfig/spanconfigstore/store.go +++ b/pkg/spanconfig/spanconfigstore/store.go @@ -15,15 +15,27 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) +// EnabledSetting is a hidden cluster setting to enable the use of the span +// configs infrastructure in KV. It switches each store in the cluster from +// using the gossip backed system config span to instead using the span configs +// infrastructure. It has no effect unless COCKROACH_EXPERIMENTAL_SPAN_CONFIGS +// is set. +var EnabledSetting = settings.RegisterBoolSetting( + "spanconfig.experimental_store.enabled", + `use the span config infrastructure in KV instead of the system config span`, + false, +).WithSystemOnly() + // Store is an in-memory data structure to store and retrieve span configs. // Internally it makes use of an interval tree to store non-overlapping span -// configs. +// configs. It's safe for concurrent use. type Store struct { mu struct { syncutil.RWMutex diff --git a/pkg/spanconfig/testing_knobs.go b/pkg/spanconfig/testing_knobs.go index bf498332d19e..acde9d9f34b6 100644 --- a/pkg/spanconfig/testing_knobs.go +++ b/pkg/spanconfig/testing_knobs.go @@ -10,7 +10,10 @@ package spanconfig -import "github.com/cockroachdb/cockroach/pkg/base" +import ( + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) // TestingKnobs provide fine-grained control over the various span config // components for testing. @@ -31,6 +34,25 @@ type TestingKnobs struct { // manager has checked if the auto span config reconciliation job exists or // not. ManagerAfterCheckedReconciliationJobExistsInterceptor func(exists bool) + + // KVSubscriberPostRangefeedStartInterceptor is invoked after the rangefeed is started. + KVSubscriberPostRangefeedStartInterceptor func() + + // KVSubscriberPreExitInterceptor is invoked right before returning from + // subscribeInner, after tearing down internal components. + KVSubscriberPreExitInterceptor func() + + // KVSubscriberOnTimestampAdvanceInterceptor is invoked each time the + // KVSubscriber has process all updates before the provided timestamp. + KVSubscriberOnTimestampAdvanceInterceptor func(hlc.Timestamp) + + // KVSubscriberErrorInjectionCh is a way for tests to conveniently inject + // buffer overflow errors into the subscriber in order to test recovery. + KVSubscriberErrorInjectionCh chan error + + // StoreKVSubscriberOverride is used to override the KVSubscriber used when + // setting up a new store. + StoreKVSubscriberOverride KVSubscriber } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 3fd5dc5fdbb1..5ddafb6fd02b 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -2245,7 +2245,14 @@ var ( ColumnIDs: []descpb.ColumnID{1, 2, 3}, }, }, - pk("start_key"), + descpb.IndexDescriptor{ + Name: "primary", + ID: keys.SpanConfigurationsTablePrimaryKeyIndexID, + Unique: true, + KeyColumnNames: []string{"start_key"}, + KeyColumnDirections: singleASC, + KeyColumnIDs: singleID1, + }, ), func(tbl *descpb.TableDescriptor) { tbl.Checks = []*descpb.TableDescriptor_CheckConstraint{{