diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 38d9147867af..c04272934912 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -181,6 +181,7 @@ ALL_TESTS = [ "//pkg/server:server_test", "//pkg/settings:settings_test", "//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test", + "//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test", "//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test", "//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test", "//pkg/spanconfig/spanconfigstore:spanconfigstore_test", diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index eaeda96e7a98..35def95cd9e7 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -357,10 +357,11 @@ const ( ZonesTableConfigColumnID = 2 ZonesTableConfigColFamID = 2 - DescriptorTablePrimaryKeyIndexID = 1 - DescriptorTableDescriptorColID = 2 - DescriptorTableDescriptorColFamID = 2 - TenantsTablePrimaryKeyIndexID = 1 + DescriptorTablePrimaryKeyIndexID = 1 + DescriptorTableDescriptorColID = 2 + DescriptorTableDescriptorColFamID = 2 + TenantsTablePrimaryKeyIndexID = 1 + SpanConfigurationsTablePrimaryKeyIndexID = 1 // Reserved IDs for other system tables. Note that some of these IDs refer // to "Ranges" instead of a Table - these IDs are needed to store custom diff --git a/pkg/keys/spans.go b/pkg/keys/spans.go index d611b2ee8a02..02e513ada545 100644 --- a/pkg/keys/spans.go +++ b/pkg/keys/spans.go @@ -13,6 +13,9 @@ package keys import "github.com/cockroachdb/cockroach/pkg/roachpb" var ( + // EverythingSpan is a span that covers everything. + EverythingSpan = roachpb.Span{Key: roachpb.KeyMin, EndKey: roachpb.KeyMax} + // Meta1Span holds all first level addressing records. Meta1Span = roachpb.Span{Key: roachpb.KeyMin, EndKey: Meta2Prefix} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 0ef98e9281ef..ed2e74ac109e 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -193,9 +193,9 @@ func (f *RangeFeed) Start(ctx context.Context) error { return nil } -// Close closes the RangeFeed and waits for it to shut down; it does -// idempotently. It's guaranteed that no future handlers will be invoked after -// this point. +// Close closes the RangeFeed and waits for it to shut down; it does so +// idempotently. It waits for the currently running handler, if any, to complete +// and guarantees that no future handlers will be invoked after this point. func (f *RangeFeed) Close() { f.closeOnce.Do(func() { f.cancel() diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 96aa6e48f6de..5a8171940b37 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -151,6 +151,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/storage", @@ -218,6 +219,7 @@ go_test( "client_replica_backpressure_test.go", "client_replica_gc_test.go", "client_replica_test.go", + "client_spanconfigs_test.go", "client_split_burst_test.go", "client_split_test.go", "client_status_test.go", @@ -340,6 +342,7 @@ go_test( "//pkg/server/telemetry", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", "//pkg/sql", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", @@ -349,6 +352,8 @@ go_test( "//pkg/sql/catalog/tabledesc", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/sql/sqlutil", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/storage/fs", diff --git a/pkg/kv/kvserver/client_spanconfigs_test.go b/pkg/kv/kvserver/client_spanconfigs_test.go new file mode 100644 index 000000000000..74efc495d655 --- /dev/null +++ b/pkg/kv/kvserver/client_spanconfigs_test.go @@ -0,0 +1,119 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestSpanConfigUpdateAppliedToReplica ensures that when a store learns of a +// span config update, it installs the corresponding config on the right +// replica. +func TestSpanConfigUpdateAppliedToReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + + spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig()) + mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore) + + ctx := context.Background() + + args := base.TestServerArgs{ + EnableSpanConfigs: true, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + SpanConfig: &spanconfig.TestingKnobs{ + StoreKVSubscriberOverride: mockSubscriber, + }, + }, + } + s, _, _ := serverutils.StartServer(t, args) + defer s.Stopper().Stop(context.Background()) + + _, err := s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", nil, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + `SET CLUSTER SETTING spanconfig.experimental_store.enabled = true`) + require.NoError(t, err) + + key, err := s.ScratchRange() + require.NoError(t, err) + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + repl := store.LookupReplica(keys.MustAddr(key)) + span := repl.Desc().RSpan().AsRawSpanWithNoLocals() + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + + deleted, added := spanConfigStore.Apply(ctx, spanconfig.Update{Span: span, Config: conf}, false /* dryrun */) + require.Empty(t, deleted) + require.Len(t, added, 1) + require.True(t, added[0].Span.Equal(span)) + require.True(t, added[0].Config.Equal(conf)) + + require.NotNil(t, mockSubscriber.callback) + mockSubscriber.callback(span) // invoke the callback + testutils.SucceedsSoon(t, func() error { + repl := store.LookupReplica(keys.MustAddr(key)) + gotConfig := repl.SpanConfig() + if !gotConfig.Equal(conf) { + return errors.Newf("expected config=%s, got config=%s", conf.String(), gotConfig.String()) + } + return nil + }) +} + +func newMockSpanConfigSubscriber(store spanconfig.Store) *mockSpanConfigSubscriber { + return &mockSpanConfigSubscriber{Store: store} +} + +type mockSpanConfigSubscriber struct { + callback func(config roachpb.Span) + spanconfig.Store +} + +func (m *mockSpanConfigSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + return m.Store.NeedsSplit(ctx, start, end) +} + +func (m *mockSpanConfigSubscriber) ComputeSplitKey( + ctx context.Context, start, end roachpb.RKey, +) roachpb.RKey { + return m.Store.ComputeSplitKey(ctx, start, end) +} + +func (m *mockSpanConfigSubscriber) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + return m.Store.GetSpanConfigForKey(ctx, key) +} + +func (m *mockSpanConfigSubscriber) Subscribe(callback func(roachpb.Span)) { + m.callback = callback +} + +var _ spanconfig.KVSubscriber = &mockSpanConfigSubscriber{} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index c3019eb04ab1..7c80312f6930 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -217,7 +218,6 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig { tracer := tracing.NewTracerWithOpt(context.TODO(), tracing.WithClusterSettings(&st.SV)) sc := StoreConfig{ DefaultSpanConfig: zonepb.DefaultZoneConfigRef().AsSpanConfig(), - DefaultSystemSpanConfig: zonepb.DefaultSystemZoneConfigRef().AsSpanConfig(), Settings: st, AmbientCtx: log.AmbientContext{Tracer: tracer}, Clock: clock, @@ -651,6 +651,7 @@ type Store struct { computeInitialMetrics sync.Once systemConfigUpdateQueueRateLimiter *quotapool.RateLimiter + spanConfigUpdateQueueRateLimiter *quotapool.RateLimiter } var _ kv.Sender = &Store{} @@ -663,18 +664,17 @@ type StoreConfig struct { AmbientCtx log.AmbientContext base.RaftConfig - DefaultSpanConfig roachpb.SpanConfig - DefaultSystemSpanConfig roachpb.SpanConfig - Settings *cluster.Settings - Clock *hlc.Clock - DB *kv.DB - Gossip *gossip.Gossip - NodeLiveness *liveness.NodeLiveness - StorePool *StorePool - Transport *RaftTransport - NodeDialer *nodedialer.Dialer - RPCContext *rpc.Context - RangeDescriptorCache *rangecache.RangeCache + DefaultSpanConfig roachpb.SpanConfig + Settings *cluster.Settings + Clock *hlc.Clock + DB *kv.DB + Gossip *gossip.Gossip + NodeLiveness *liveness.NodeLiveness + StorePool *StorePool + Transport *RaftTransport + NodeDialer *nodedialer.Dialer + RPCContext *rpc.Context + RangeDescriptorCache *rangecache.RangeCache ClosedTimestampSender *sidetransport.Sender ClosedTimestampReceiver sidetransportReceiver @@ -759,6 +759,10 @@ type StoreConfig struct { // SpanConfigsEnabled determines whether we're able to use the span configs // infrastructure. SpanConfigsEnabled bool + // Used to subscribe to span configuration changes, keeping up-to-date a + // data structure useful for retrieving span configs. Only available if + // SpanConfigsEnabled. + SpanConfigSubscriber spanconfig.KVSubscriber // KVAdmissionController is an optional field used for admission control. KVAdmissionController KVAdmissionController @@ -1635,6 +1639,26 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { }) } + if s.cfg.SpanConfigsEnabled { + s.cfg.SpanConfigSubscriber.Subscribe(func(update roachpb.Span) { + s.onSpanConfigUpdate(ctx, update) + }) + + // When toggling between the system config span and the span configs + // infrastructure, we want to re-apply configs on all replicas from + // whatever the new source is. + spanconfigstore.EnabledSetting.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) { + enabled := spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) + if enabled { + s.applyAllFromSpanConfigStore(ctx) + } else { + if s.cfg.Gossip != nil && s.cfg.Gossip.GetSystemConfig() != nil { + s.systemGossipUpdate(s.cfg.Gossip.GetSystemConfig()) + } + } + }) + } + if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal { s.startLeaseRenewer(ctx) } @@ -1776,6 +1800,10 @@ func (s *Store) GetConfReader() (spanconfig.StoreReader, error) { return nil, errSysCfgUnavailable } + if s.cfg.SpanConfigsEnabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return spanconfigstore.NewShadowReader(s.cfg.SpanConfigSubscriber, sysCfg), nil + } + return sysCfg, nil } @@ -1930,11 +1958,11 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { // We'll want to offer all replicas to the split and merge queues. Be a little // careful about not spawning too many individual goroutines. + shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1) // For every range, update its zone config and check if it needs to // be split or merged. now := s.cfg.Clock.NowAsClockTimestamp() - shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1) newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { key := repl.Desc().StartKey conf, err := sysCfg.GetSpanConfigForKey(ctx, key) @@ -1957,6 +1985,110 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { }) } +// onSpanConfigUpdate is the callback invoked whenever this store learns of a +// span config update. +func (s *Store) onSpanConfigUpdate(ctx context.Context, updated roachpb.Span) { + if !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return + } + + sp, err := keys.SpanAddr(updated) + if err != nil { + log.Errorf(ctx, "skipped applying update (%s), unexpected error resolving span address: %v", + updated, err) + return + } + + now := s.cfg.Clock.NowAsClockTimestamp() + if err := s.mu.replicasByKey.VisitKeyRange(ctx, sp.Key, sp.EndKey, AscendingKeyOrder, + func(ctx context.Context, it replicaOrPlaceholder) error { + repl := it.repl + if repl == nil { + return nil // placeholder; ignore + } + + startKey := repl.Desc().StartKey + if !sp.ContainsKey(startKey) { + // It's possible that the update we're receiving here is the + // right-hand side of a span config getting split. Think of + // installing a zone config on some partition of an index where + // previously there was none on any of the partitions. The range + // spanning the entire index would have to split on the + // partition boundary, and before it does so, it's possible that + // it would receive a span config update for just the partition. + // + // To avoid clobbering the pre-split range's embedded span + // config with the partition's config, we'll ensure that the + // range's start key is part of the update. We don't have to + // enqueue the range in the split queue here, that takes place + // when processing the left-hand side span config update. + + return nil // ignore + } + + // TODO(irfansharif): It's possible for a config to be applied over an + // entire range when it only pertains to the first half of the range. + // This will be corrected shortly -- we enqueue the range for a split + // below where we then apply the right config on each half. But still, + // it's surprising behavior and gets in the way of a desirable + // consistency guarantee: a key's config at any point in time is one + // that was explicitly declared over it, or the default config. + // + // We can do better, we can skip applying the config entirely and + // enqueue the split, then relying on the split trigger to install + // the right configs on each half. The current structure is as it is + // to maintain parity with the system config span variant. + + replCtx := repl.AnnotateCtx(ctx) + conf, err := s.cfg.SpanConfigSubscriber.GetSpanConfigForKey(replCtx, startKey) + if err != nil { + log.Errorf(ctx, "skipped applying update, unexpected error reading from subscriber: %v", err) + return err + } + repl.SetSpanConfig(conf) + + // TODO(irfansharif): For symmetry with the system config span variant, + // we queue blindly; we could instead only queue it if we knew the + // range's keyspans has a split in there somewhere, or was now part of a + // larger range and eligible for a merge. + s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + return nil // more + }, + ); err != nil { + // Errors here should not be possible, but if there is one, log loudly. + log.Errorf(ctx, "unexpected error visiting replicas: %v", err) + } +} + +// applyAllFromSpanConfigStore applies, on each replica, span configs from the +// embedded span config store. +func (s *Store) applyAllFromSpanConfigStore(ctx context.Context) { + now := s.cfg.Clock.NowAsClockTimestamp() + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + replCtx := repl.AnnotateCtx(ctx) + key := repl.Desc().StartKey + conf, err := s.cfg.SpanConfigSubscriber.GetSpanConfigForKey(replCtx, key) + if err != nil { + log.Errorf(ctx, "skipped applying config update, unexpected error reading from subscriber: %v", err) + return true // more + } + + repl.SetSpanConfig(conf) + s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + return true // more + }) +} + func (s *Store) asyncGossipStore(ctx context.Context, reason string, useCached bool) { if err := s.stopper.RunAsyncTask( ctx, fmt.Sprintf("storage.Store: gossip on %s", reason), diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 786b01a86c98..eba3f2c6028d 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -327,6 +328,9 @@ type StoreTestingKnobs struct { // PurgeOutdatedReplicasInterceptor intercepts attempts to purge outdated // replicas in the store. PurgeOutdatedReplicasInterceptor func() + // SpanConfigUpdateInterceptor is called after the store hears about a span + // config update. + SpanConfigUpdateInterceptor func(spanconfig.Update) // If set, use the given version as the initial replica version when // bootstrapping ranges. This is used for testing the migration // infrastructure. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 867dce5b597f..80ac4fd11f22 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -113,6 +113,7 @@ go_library( "//pkg/spanconfig", "//pkg/spanconfig/spanconfigjob", "//pkg/spanconfig/spanconfigkvaccessor", + "//pkg/spanconfig/spanconfigkvsubscriber", "//pkg/spanconfig/spanconfigmanager", "//pkg/spanconfig/spanconfigsqltranslator", "//pkg/sql", diff --git a/pkg/server/server.go b/pkg/server/server.go index d805f5e54517..480db62f0864 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -71,6 +71,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" _ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/contention" @@ -177,6 +178,8 @@ type Server struct { protectedtsProvider protectedts.Provider protectedtsReconciler *ptreconcile.Reconciler + spanConfigSubscriber *spanconfigkvsubscriber.KVSubscriber + sqlServer *SQLServer drainSleepFn func(time.Duration) @@ -612,8 +615,26 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } var spanConfigAccessor spanconfig.KVAccessor + var spanConfigSubscriber *spanconfigkvsubscriber.KVSubscriber if cfg.SpanConfigsEnabled { storeCfg.SpanConfigsEnabled = true + spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) + if spanConfigKnobs != nil && spanConfigKnobs.StoreKVSubscriberOverride != nil { + storeCfg.SpanConfigSubscriber = spanConfigKnobs.StoreKVSubscriberOverride + } else { + spanConfigSubscriber = spanconfigkvsubscriber.New( + stopper, + db, + clock, + rangeFeedFactory, + keys.SpanConfigurationsTableID, + 1<<20, /* 1 MB */ + storeCfg.DefaultSpanConfig, + spanConfigKnobs, + ) + storeCfg.SpanConfigSubscriber = spanConfigSubscriber + } + spanConfigAccessor = spanconfigkvaccessor.New( db, internalExecutor, cfg.Settings, systemschema.SpanConfigurationsTableName.FQString(), @@ -808,6 +829,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { replicationReporter: replicationReporter, protectedtsProvider: protectedtsProvider, protectedtsReconciler: protectedtsReconciler, + spanConfigSubscriber: spanConfigSubscriber, sqlServer: sqlServer, drainSleepFn: drainSleepFn, externalStorageBuilder: externalStorageBuilder, @@ -1726,6 +1748,11 @@ func (s *Server) PreStart(ctx context.Context) error { return err } + if s.cfg.SpanConfigsEnabled && s.spanConfigSubscriber != nil { + if err := s.spanConfigSubscriber.Start(ctx); err != nil { + return err + } + } // Start garbage collecting system events. // // NB: As written, this falls awkwardly between SQL and KV. KV is used only diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 06cb134ef1cd..97e4c6dc7f72 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -24,7 +24,10 @@ import ( type KVAccessor interface { // GetSpanConfigEntriesFor returns the span configurations that overlap with // the given spans. - GetSpanConfigEntriesFor(ctx context.Context, spans []roachpb.Span) ([]roachpb.SpanConfigEntry, error) + GetSpanConfigEntriesFor( + ctx context.Context, + spans []roachpb.Span, + ) ([]roachpb.SpanConfigEntry, error) // UpdateSpanConfigEntries updates configurations for the given spans. This // is a "targeted" API: the spans being deleted are expected to have been @@ -33,35 +36,72 @@ type KVAccessor interface { // divvying up an existing span into multiple others with distinct configs, // callers are to issue a delete for the previous span and upserts for the // new ones. - UpdateSpanConfigEntries(ctx context.Context, toDelete []roachpb.Span, toUpsert []roachpb.SpanConfigEntry) error + UpdateSpanConfigEntries( + ctx context.Context, + toDelete []roachpb.Span, + toUpsert []roachpb.SpanConfigEntry, + ) error +} + +// KVSubscriber presents a consistent[1] snapshot of a StoreReader that's +// incrementally maintained with changes made to the global span configurations +// state (system.span_configurations). The maintenance happens transparently; +// callers can subscribe to learn about what key spans may have seen a +// configuration change. After learning about a span update through a callback +// invocation, subscribers can consult the embedded StoreReader to retrieve an +// up-to-date[2] config for the updated span. The callback is called in a single +// goroutine; it should avoid doing any long-running or blocking work. +// +// When a callback is first installed, it's invoked with the [min,max) span -- +// a shorthand to indicate that subscribers should consult the StoreReader for all +// spans of interest. Subsequent updates are of the more incremental kind. It's +// possible that the span updates received are no-ops, i.e. consulting the +// StoreReader for the given span would still retrieve the last config observed +// for the span[3]. +// +// [1]: The contents of the StoreReader at t1 corresponds exactly to the +// contents of the global span configuration state at t0 where t0 <= t1. If +// the StoreReader is read from at t2 where t2 > t1, it's guaranteed to +// observe a view of the global state at t >= t0. +// [2]: For the canonical KVSubscriber implementation, this is typically lagging +// by the closed timestamp target duration. +// [3]: The canonical KVSubscriber implementation is bounced whenever errors +// occur, which may result in the re-transmission of earlier updates +// (typically through a coarsely targeted [min,max) span). +type KVSubscriber interface { + StoreReader + Subscribe(func(updated roachpb.Span)) } // SQLTranslator translates SQL descriptors and their corresponding zone // configurations to constituent spans and span configurations. // // Concretely, for the following zone configuration hierarchy: +// // CREATE DATABASE db; // CREATE TABLE db.t1(); // ALTER DATABASE db CONFIGURE ZONE USING num_replicas=7; // ALTER TABLE db.t1 CONFIGURE ZONE USING num_voters=5; +// // The SQLTranslator produces the following translation (represented as a diff // against RANGE DEFAULT for brevity): +// // Table/5{3-4} num_replicas=7 num_voters=5 type SQLTranslator interface { - // Translate generates the implied span configuration state given a list of // Translate generates the span configuration state given a list of - // {descriptor, named zone} IDs. No entry is returned for an ID if it doesn't - // exist or has been dropped. The timestamp at which the translation is valid - // is also returned. + // {descriptor, named zone} IDs. No entry is returned for an ID if it + // doesn't exist or if it's dropped. The timestamp at which the translation + // is valid is also returned. // - // For every ID we first descend the zone configuration hierarchy with the ID - // as the root to accumulate IDs of all leaf objects. Leaf objects are tables - // and named zones (other than RANGE DEFAULT) which have actual span - // configurations associated with them (as opposed to non-leaf nodes that only - // serve to hold zone configurations for inheritance purposes). Then, for - // for every one of these accumulated IDs, we generate - // tuples by following up the inheritance chain to fully hydrate the span - // configuration. Translate also accounts for and negotiates subzone spans. + // For every ID we first descend the zone configuration hierarchy with the + // ID as the root to accumulate IDs of all leaf objects. Leaf objects are + // tables and named zones (other than RANGE DEFAULT) which have actual span + // configurations associated with them (as opposed to non-leaf nodes that + // only serve to hold zone configurations for inheritance purposes). Then, + // for each one of these accumulated IDs, we generate tuples by following up the inheritance chain to fully hydrate the + // span configuration. Translate also accounts for and negotiates subzone + // spans. Translate(ctx context.Context, ids descpb.IDs) ([]roachpb.SpanConfigEntry, hlc.Timestamp, error) } @@ -178,8 +218,8 @@ type StoreReader interface { GetSpanConfigForKey(ctx context.Context, key roachpb.RKey) (roachpb.SpanConfig, error) } -// Update captures what span has seen a config change. It will be the unit of -// what a {SQL,KV}Watcher emits, and what can be applied to a StoreWriter. +// Update captures a span and the corresponding config change. It's the unit of +// what can be applied to a StoreWriter. type Update struct { // Span captures the key span being updated. Span roachpb.Span diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index c748b70554c4..fa461a0d11cb 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -30,8 +30,8 @@ var _ jobs.Resumer = (*resumer)(nil) func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) error { execCtx := execCtxI.(sql.JobExecContext) rc := execCtx.SpanConfigReconciliationJobDeps() - // TODO(zcfg-pod): Upcoming PRs will actually make use of these reconciliation - // dependencies. + + // TODO(irfansharif): Actually make use of these dependencies. _ = rc <-ctx.Done() diff --git a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel new file mode 100644 index 000000000000..2790e0aba9fe --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel @@ -0,0 +1,76 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "spanconfigkvsubscriber", + srcs = [ + "kvsubscriber.go", + "span_config_decoder.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", + "//pkg/roachpb:with-mocks", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/row", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/encoding", + "//pkg/util/grpcutil", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/retry", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "spanconfigkvsubscriber_test", + srcs = [ + "datadriven_test.go", + "kvsubscriber_test.go", + "main_test.go", + "span_config_decoder_test.go", + ], + data = glob(["testdata/**"]), + embed = [":spanconfigkvsubscriber"], + deps = [ + "//pkg/base", + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigkvaccessor", + "//pkg/spanconfig/spanconfigtestutils", + "//pkg/sql/sqlutil", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go b/pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go new file mode 100644 index 000000000000..5ec6d0376ed8 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go @@ -0,0 +1,312 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber_test + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestDataDriven runs datadriven tests against the KVSubscriber interface. +// The syntax is as follows: +// +// update +// delete [c,e) +// upsert [c,d):C +// upsert [d,e):D +// ---- +// ok +// +// get +// span [a,b) +// span [b,c) +// ---- +// [a,b):A +// [b,d):B +// +// start +// ---- +// +// updates +// ---- +// [a,b) +// [b,d) +// [e,f) +// +// store-reader key=b +// ---- +// [b,d):B +// +// store-reader compute-split=[a,c) +// ---- +// b +// +// store-reader needs-split=[b,h) +// ---- +// true +// +// inject-buffer-overflow +// ---- +// ok +// +// - update and get tie into GetSpanConfigEntriesFor and +// UpdateSpanConfigEntries respectively on the KVAccessor interface, and are a +// convenient shorthand to populate the system table that the KVSubscriber +// subscribes to. The input is processed in a single batch. +// - start starts the subscription process. It can also be used to verify +// behavior when re-establishing subscriptions after hard errors. +// - updates lists the span updates the KVSubscriber receives, in the listed +// order. Updates in a batch are de-duped. +// - store-reader {key,compute-split,needs-split} relate to GetSpanConfigForKey, +// ComputeSplitKey and NeedsSplit respectively on the StoreReader subset of the +// KVSubscriber interface. +// - inject-buffer-overflow can be used to inject rangefeed buffer overflow +// errors within the kvsubscriber. It pokes into the internals of the +// kvsubscriber and is useful to test teardown and recovery behavior. +// +// Text of the form [a,b) and [a,b):C correspond to spans and span config +// entries; see spanconfigtestutils.Parse{Span,Config,SpanConfigEntry} for more +// details. +func TestDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() + + datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + EnableSpanConfigs: true, + }, + }) + defer cancel() + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + tdb.Exec(t, `SET CLUSTER SETTING spanconfig.experimental_kvaccessor.enabled = true`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + + const dummyTableName = "dummy_span_configurations" + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(t, fmt.Sprintf( + `SELECT table_id from crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + kvAccessor := spanconfigkvaccessor.New( + tc.Server(0).DB(), + tc.Server(0).InternalExecutor().(sqlutil.InternalExecutor), + tc.Server(0).ClusterSettings(), + fmt.Sprintf("defaultdb.public.%s", dummyTableName), + ) + + mu := struct { + syncutil.Mutex + lastFrontierTS hlc.Timestamp // serializes updates and update + subscriberRunning bool // serializes updates, subscribe, and inject-buffer-overflow + receivedUpdates roachpb.Spans + }{} + injectedErrCh := make(chan error) + + kvSubscriber := spanconfigkvsubscriber.New( + ts.Stopper(), + ts.DB(), + ts.Clock(), + ts.RangeFeedFactory().(*rangefeed.Factory), + dummyTableID, + 10<<20, /* 10 MB */ + spanconfigtestutils.ParseConfig(t, "MISSING"), + &spanconfig.TestingKnobs{ + KVSubscriberOnTimestampAdvanceInterceptor: func(ts hlc.Timestamp) { + mu.Lock() + defer mu.Unlock() + mu.lastFrontierTS = ts + }, + KVSubscriberPostRangefeedStartInterceptor: func() { + mu.Lock() + defer mu.Unlock() + mu.subscriberRunning = true + }, + KVSubscriberPreExitInterceptor: func() { + mu.Lock() + defer mu.Unlock() + mu.subscriberRunning = false + }, + KVSubscriberErrorInjectionCh: injectedErrCh, + }, + ) + + kvSubscriber.Subscribe(func(span roachpb.Span) { + mu.Lock() + defer mu.Unlock() + mu.receivedUpdates = append(mu.receivedUpdates, span) + }) + + var lastUpdateTS hlc.Timestamp + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "get": + spans := spanconfigtestutils.ParseKVAccessorGetArguments(t, d.Input) + entries, err := kvAccessor.GetSpanConfigEntriesFor(ctx, spans) + require.NoError(t, err) + + var output strings.Builder + for _, entry := range entries { + output.WriteString(fmt.Sprintf("%s\n", spanconfigtestutils.PrintSpanConfigEntry(entry))) + } + return output.String() + + case "update": + toDelete, toUpsert := spanconfigtestutils.ParseKVAccessorUpdateArguments(t, d.Input) + require.NoError(t, kvAccessor.UpdateSpanConfigEntries(ctx, toDelete, toUpsert)) + lastUpdateTS = ts.Clock().Now() + + case "start": + mu.Lock() + require.False(t, mu.subscriberRunning, "subscriber already running") + mu.Unlock() + + go func() { + _ = kvSubscriber.TestingRunInner(ctx) + }() + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + if !mu.subscriberRunning { + return errors.New("expected subscriber to have started") + } + return nil + }) + + case "updates": + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + + if !mu.subscriberRunning { + // The subscriber isn't running, we're not expecting any + // frontier bumps. + return nil + } + + // The subscriber is running -- we should be observing + // frontier bumps. In order to serialize after the last + // kvaccessor-update, lets wait until the frontier timestamp + // is past it. + if lastUpdateTS.LessEq(mu.lastFrontierTS) { + return nil + } + + return errors.Newf("frontier timestamp (%s) lagging last update (%s)", + mu.lastFrontierTS.String(), lastUpdateTS.String()) + }) // TODO(irfansharif): We could use a tighter bound here, but it's unreliable under stress. + + mu.Lock() + receivedUpdates := mu.receivedUpdates + mu.receivedUpdates = mu.receivedUpdates[:0] // clear out buffer + mu.Unlock() + + var output strings.Builder + sort.Sort(receivedUpdates) + for i, update := range receivedUpdates { + if i != 0 && receivedUpdates[i].Equal(receivedUpdates[i-1]) { + continue // de-dup updates + } + + var spanStr string + if update.Equal(keys.EverythingSpan) { + spanStr = update.String() + } else { + spanStr = spanconfigtestutils.PrintSpan(update) + } + output.WriteString(fmt.Sprintf("%s\n", spanStr)) + } + + return output.String() + + case "inject-buffer-overflow": + injectedErrCh <- rangefeedbuffer.ErrBufferLimitExceeded + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + if mu.subscriberRunning { + return errors.New("expected subscriber to have stopped") + } + return nil + + }) + + case "store-reader": + if len(d.CmdArgs) != 1 { + d.Fatalf(t, "unexpected number of args (%d), expected 1", len(d.CmdArgs)) + } + cmdArg := d.CmdArgs[0] + + switch cmdArg.Key { + case "key": + var keyStr string + d.ScanArgs(t, cmdArg.Key, &keyStr) + config, err := kvSubscriber.GetSpanConfigForKey(ctx, roachpb.RKey(keyStr)) + require.NoError(t, err) + return fmt.Sprintf("conf=%s", spanconfigtestutils.PrintSpanConfig(config)) + + case "compute-split": + var spanStr string + d.ScanArgs(t, cmdArg.Key, &spanStr) + span := spanconfigtestutils.ParseSpan(t, spanStr) + start, end := roachpb.RKey(span.Key), roachpb.RKey(span.EndKey) + splitKey := kvSubscriber.ComputeSplitKey(ctx, start, end) + return string(splitKey) + + case "needs-split": + var spanStr string + d.ScanArgs(t, cmdArg.Key, &spanStr) + span := spanconfigtestutils.ParseSpan(t, spanStr) + start, end := roachpb.RKey(span.Key), roachpb.RKey(span.EndKey) + result := kvSubscriber.NeedsSplit(ctx, start, end) + return fmt.Sprintf("%t", result) + + default: + d.Fatalf(t, "unknown argument: %s", cmdArg.Key) + } + + default: + d.Fatalf(t, "unknown command: %s", d.Cmd) + } + return "" + }) + }) +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go new file mode 100644 index 000000000000..77fca41a25c1 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -0,0 +1,446 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber + +import ( + "context" + "strings" + "sync/atomic" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// KVSubscriber is used to subscribe to global span configuration changes. It's +// a concrete implementation of the spanconfig.KVSubscriber interface. +// +// It's expected to Start-ed once, after which one or many subscribers can +// listen in for updates. Internally we maintain a rangefeed over the global +// store of span configurations (system.span_configurations), applying updates +// from it into an internal spanconfig.Store. A read-only view of this data +// structure (spanconfig.StoreReader) is exposed as part of the KVSubscriber +// interface. Rangefeeds used as is don't offer any ordering guarantees with +// respect to updates made over non-overlapping keys, which is something we care +// about[1]. For that reason we make use of a rangefeed buffer, accumulating raw +// rangefeed updates and flushing them out en-masse in timestamp order when the +// rangefeed frontier is bumped[2]. If the buffer overflows (as dictated by the +// memory limit the KVSubscriber is instantiated with), the old rangefeed is +// wound down and a new one re-established. +// +// When running into the internal errors described above, it's safe for us to +// re-establish the underlying rangefeeds. When re-establishing a new rangefeed +// and populating a spanconfig.Store using the contents of the initial scan[3], +// we wish to preserve the existing spanconfig.StoreReader. Discarding it would +// entail either blocking all external readers until a new +// spanconfig.StoreReader was fully populated, or presenting an inconsistent +// view of the spanconfig.Store that's currently being populated. For new +// rangefeeds what we do then is route all updates from the initial scan to a +// fresh spanconfig.Store, and once the initial scan is done, swap at the source +// for the exported spanconfig.StoreReader. During the initial scan, concurrent +// readers would continue to observe the last spanconfig.StoreReader if any. +// After the swap, it would observe the more up-to-date source instead. Future +// incremental updates will also target the new source. When this source swap +// occurs, we inform the handler of the need to possibly refresh its view of all +// configs. +// +// TODO(irfansharif): When swapping the old spanconfig.StoreReader for the new, +// instead of informing registered handlers with an everything [min,max) span, +// we could diff the two data structures and only emit targeted updates. +// +// [1]: For a given key k, it's config may be stored as part of a larger span S +// (where S.start <= k < S.end). It's possible for S to get deleted and +// replaced with sub-spans S1...SN in the same transaction if the span is +// getting split. When applying these updates, we need to make sure to +// process the deletion event for S before processing S1...SN. +// [2]: In our example above deleting the config for S and adding configs for +// S1...SN we want to make sure that we apply the full set of updates all +// at once -- lest we expose the intermediate state where the config for S +// was deleted but the configs for S1...SN were not yet applied. +// [3]: TODO(irfansharif): When tearing down the subscriber due to underlying +// errors, we could also capture a checkpoint to use the next time the +// subscriber is established. That way we can avoid the full initial scan +// over the span configuration state and simply pick up where we left off +// with our existing spanconfig.Store. +type KVSubscriber struct { + stopper *stop.Stopper + db *kv.DB + clock *hlc.Clock + rangefeedFactory *rangefeed.Factory + decoder *spanConfigDecoder + spanConfigTableSpan roachpb.Span // typically system.span_configurations, but overridable for tests + bufferMemLimit int64 + fallback roachpb.SpanConfig + knobs *spanconfig.TestingKnobs + + started int32 // accessed atomically + mu struct { // serializes between Start and external threads + syncutil.RWMutex + // internal is the internal spanconfig.Store maintained by the + // KVSubscriber. A read-only view over this store is exposed as part of + // the interface. When re-subscribing, a fresh spanconfig.Store is + // populated while the exposed spanconfig.StoreReader appears static. + // Once sufficiently caught up, the fresh spanconfig.Store is swapped in + // and the old discarded. See type-level comment for more details. + internal spanconfig.Store + handlers []handler + } + + lastFrontierTS hlc.Timestamp // used to assert monotonicity across subscription attempts +} + +var _ spanconfig.KVSubscriber = &KVSubscriber{} + +// spanConfigurationsTableRowSize is an estimate of the size of a single row in +// the system.span_configurations table (size of start/end key, and size of a +// marshaled span config proto). The value used here was pulled out of thin air +// -- it only serves to coarsely limit how large the KVSubscriber's underlying +// rangefeed buffer can get. +const spanConfigurationsTableRowSize = 5 << 10 // 5 KB + +// New instantiates a KVSubscriber. +func New( + stopper *stop.Stopper, + db *kv.DB, + clock *hlc.Clock, + rangeFeedFactory *rangefeed.Factory, + spanConfigurationsTableID uint32, + bufferMemLimit int64, + fallback roachpb.SpanConfig, + knobs *spanconfig.TestingKnobs, +) *KVSubscriber { + spanConfigTableStart := keys.SystemSQLCodec.IndexPrefix( + spanConfigurationsTableID, + keys.SpanConfigurationsTablePrimaryKeyIndexID, + ) + spanConfigTableSpan := roachpb.Span{ + Key: spanConfigTableStart, + EndKey: spanConfigTableStart.PrefixEnd(), + } + spanConfigStore := spanconfigstore.New(fallback) + if knobs == nil { + knobs = &spanconfig.TestingKnobs{} + } + s := &KVSubscriber{ + stopper: stopper, + db: db, + clock: clock, + bufferMemLimit: bufferMemLimit, + rangefeedFactory: rangeFeedFactory, + spanConfigTableSpan: spanConfigTableSpan, + fallback: fallback, + knobs: knobs, + decoder: newSpanConfigDecoder(), + } + s.mu.internal = spanConfigStore + return s +} + +// Start establishes a subscription (internally: rangefeed) over the global +// store of span configs. It fires off an async task to do so, re-establishing +// internally when retryable errors[1] occur and stopping only when the surround +// stopper is quiescing or the context canceled. All installed handlers are +// invoked in the single async task thread. +// +// [1]: It's possible for retryable errors to occur internally, at which point +// we tear down the existing subscription and re-establish another. When +// unsubscribed, the exposed spanconfig.StoreReader continues to be +// readable (though no longer incrementally maintained -- the view gets +// progressively staler overtime). Existing handlers are kept intact and +// notified when the subscription is re-established. After re-subscribing, +// the exported StoreReader will be up-to-date and continue to be +// incrementally maintained. +func (s *KVSubscriber) Start(ctx context.Context) error { + return s.stopper.RunAsyncTask(ctx, "spanconfig-kvsubscriber", func(ctx context.Context) { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() + + const aWhile = 5 * time.Minute // arbitrary but much longer than a retry + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { + + started := timeutil.Now() + if err := s.run(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return // we're done here + } + + if timeutil.Since(started) > aWhile { + r.Reset() + } + + log.Warningf(ctx, "spanconfig-kvsubscriber failed with %v, retrying...", err) + continue + } + + return // we're done here (the stopper was stopped, run exited cleanly) + } + }) +} + +// run establishes a rangefeed over the global store of span configs. +// This is a blocking operation, returning (and unsubscribing) only when the +// surrounding stopper is stopped, the context canceled, or when a retryable +// error occurs. For the latter, it's expected that callers will re-run the +// subscriber. +func (s *KVSubscriber) run(ctx context.Context) error { + if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { + log.Fatal(ctx, "currently started: only allowed once at any point in time") + } + if fn := s.knobs.KVSubscriberPreExitInterceptor; fn != nil { + defer fn() + } + defer func() { atomic.StoreInt32(&s.started, 0) }() + + buffer := rangefeedbuffer.New(int(s.bufferMemLimit / spanConfigurationsTableRowSize)) + frontierBumpedCh, initialScanDoneCh, errCh := make(chan struct{}), make(chan struct{}), make(chan error) + mu := struct { // serializes access between the rangefeed and the main thread here + syncutil.Mutex + frontierTS hlc.Timestamp + }{} + + defer func() { + mu.Lock() + s.lastFrontierTS.Forward(mu.frontierTS) + mu.Unlock() + }() + + onValue := func(ctx context.Context, ev *roachpb.RangeFeedValue) { + deleted := !ev.Value.IsPresent() + var value roachpb.Value + if deleted { + if !ev.PrevValue.IsPresent() { + // It's possible to write a KV tombstone on top of another KV + // tombstone -- both the new and old value will be empty. We simply + // ignore these events. + return + } + + // Since the end key is not part of the primary key, we need to + // decode the previous value in order to determine what it is. + value = ev.PrevValue + } else { + value = ev.Value + } + entry, err := s.decoder.decode(roachpb.KeyValue{ + Key: ev.Key, + Value: value, + }) + if err != nil { + log.Fatalf(ctx, "failed to decode row: %v", err) // non-retryable error; just fatal + } + + if log.ExpensiveLogEnabled(ctx, 1) { + log.Infof(ctx, "received span configuration update for %s (deleted=%t)", entry.Span, deleted) + } + + update := spanconfig.Update{Span: entry.Span} + if !deleted { + update.Config = entry.Config + } + + if err := buffer.Add(ctx, &bufferEvent{update, ev.Value.Timestamp}); err != nil { + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is closed by the + // main handler goroutine. It's closed after we stop listening + // to errCh. + case errCh <- err: + } + } + } + + initialScanTS := s.clock.Now() + if initialScanTS.Less(s.lastFrontierTS) { + log.Fatalf(ctx, "initial scan timestamp (%s) regressed from last recorded frontier (%s)", initialScanTS, s.lastFrontierTS) + } + + rangeFeed := s.rangefeedFactory.New("spanconfig-rangefeed", s.spanConfigTableSpan, initialScanTS, + onValue, + rangefeed.WithInitialScan(func(ctx context.Context) { + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is closed by the + // main handler goroutine. It's closed after we stop listening + // to initialScanDoneCh. + case initialScanDoneCh <- struct{}{}: + } + }), + rangefeed.WithOnFrontierAdvance(func(ctx context.Context, frontierTS hlc.Timestamp) { + mu.Lock() + mu.frontierTS = frontierTS + mu.Unlock() + + select { + case <-ctx.Done(): + case frontierBumpedCh <- struct{}{}: + } + }), + rangefeed.WithDiff(), + rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) { + // TODO(irfansharif): Consider if there are other errors which we + // want to treat as permanent. This was cargo culted from the + // settings watcher. + if grpcutil.IsAuthError(err) || + strings.Contains(err.Error(), "rpc error: code = Unauthenticated") { + return true + } + return false + }), + ) + if err := rangeFeed.Start(ctx); err != nil { + return err + } + defer rangeFeed.Close() + if fn := s.knobs.KVSubscriberPostRangefeedStartInterceptor; fn != nil { + fn() + } + + log.Info(ctx, "established range feed over span configurations table") + + injectedErrCh := s.knobs.KVSubscriberErrorInjectionCh + + for { + select { + case <-s.stopper.ShouldQuiesce(): + return nil + case <-ctx.Done(): + return ctx.Err() + case <-frontierBumpedCh: + mu.Lock() + frontierTS := mu.frontierTS + mu.Unlock() + + events := buffer.Flush(ctx, frontierTS) + s.mu.Lock() + for _, ev := range events { + s.mu.internal.Apply(ctx, ev.(*bufferEvent).Update, false /* dryrun */) + } + handlers := s.mu.handlers + s.mu.Unlock() + + for _, h := range handlers { + for _, ev := range events { + h.invoke(ev.(*bufferEvent).Update.Span) + } + } + + if fn := s.knobs.KVSubscriberOnTimestampAdvanceInterceptor; fn != nil { + fn(frontierTS) + } + case <-initialScanDoneCh: + events := buffer.Flush(ctx, initialScanTS) + freshStore := spanconfigstore.New(s.fallback) + for _, ev := range events { + freshStore.Apply(ctx, ev.(*bufferEvent).Update, false /* dryrun */) + } + + s.mu.Lock() + s.mu.internal = freshStore + handlers := s.mu.handlers + s.mu.Unlock() + + for _, h := range handlers { + // When re-establishing a rangefeed, it's possible we have a + // spanconfig.Store with arbitrary updates from what was + // exported last. Let's inform the handler than everything needs + // to be checked again. + h.invoke(keys.EverythingSpan) + } + + if fn := s.knobs.KVSubscriberOnTimestampAdvanceInterceptor; fn != nil { + fn(initialScanTS) + } + case err := <-errCh: + return err + case err := <-injectedErrCh: + return err + } + } +} + +// Subscribe installs a callback that's invoked with whatever span may have seen +// a config update. +func (s *KVSubscriber) Subscribe(fn func(roachpb.Span)) { + s.mu.Lock() + defer s.mu.Unlock() + + s.mu.handlers = append(s.mu.handlers, handler{fn: fn}) +} + +// NeedsSplit is part of the spanconfig.KVSubscriber interface. +func (s *KVSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.mu.internal.NeedsSplit(ctx, start, end) +} + +// ComputeSplitKey is part of the spanconfig.KVSubscriber interface. +func (s *KVSubscriber) ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.mu.internal.ComputeSplitKey(ctx, start, end) +} + +// GetSpanConfigForKey is part of the spanconfig.KVSubscriber interface. +func (s *KVSubscriber) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.mu.internal.GetSpanConfigForKey(ctx, key) +} + +type handler struct { + initialized bool // tracks whether we need to invoke with a [min,max) span first + fn func(update roachpb.Span) +} + +func (h handler) invoke(update roachpb.Span) { + if !h.initialized { + h.fn(keys.EverythingSpan) + h.initialized = true + + if update.Equal(keys.EverythingSpan) { + return // we can opportunistically avoid re-invoking with the same update + } + } + + h.fn(update) +} + +type bufferEvent struct { + spanconfig.Update + ts hlc.Timestamp +} + +// Timestamp implements the rangefeedbuffer.Event interface. +func (w *bufferEvent) Timestamp() hlc.Timestamp { + return w.ts +} + +var _ rangefeedbuffer.Event = &bufferEvent{} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go new file mode 100644 index 000000000000..d5ba0fe16ab2 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go @@ -0,0 +1,18 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber + +import "context" + +// TestingRunInner exports the inner run method for testing purposes. +func (s *KVSubscriber) TestingRunInner(ctx context.Context) error { + return s.run(ctx) +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/main_test.go b/pkg/spanconfig/spanconfigkvsubscriber/main_test.go new file mode 100644 index 000000000000..177f2e83fa44 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go new file mode 100644 index 000000000000..76fa92e1f7a8 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go @@ -0,0 +1,118 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber + +import ( + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" +) + +// spanConfigDecoder decodes rows from system.span_configurations. It's not +// safe for concurrent use. +type spanConfigDecoder struct { + alloc rowenc.DatumAlloc + colIdxMap catalog.TableColMap +} + +// newSpanConfigDecoder instantiates a spanConfigDecoder. +func newSpanConfigDecoder() *spanConfigDecoder { + return &spanConfigDecoder{ + colIdxMap: row.ColIDtoRowIndexFromCols( + systemschema.SpanConfigurationsTable.PublicColumns(), + ), + } +} + +// decode a span config entry given a KV from the +// system.span_configurations table. +func (sd *spanConfigDecoder) decode(kv roachpb.KeyValue) (entry roachpb.SpanConfigEntry, _ error) { + tbl := systemschema.SpanConfigurationsTable + // First we need to decode the start_key field from the index key. + { + types := []*types.T{tbl.PublicColumns()[0].GetType()} + startKeyRow := make([]rowenc.EncDatum, 1) + _, matches, _, err := rowenc.DecodeIndexKey( + keys.SystemSQLCodec, tbl, tbl.GetPrimaryIndex(), + types, startKeyRow, nil, kv.Key, + ) + if err != nil { + return roachpb.SpanConfigEntry{}, errors.Wrapf(err, "failed to decode key: %v", kv.Key) + } + if !matches { + return roachpb.SpanConfigEntry{}, + errors.AssertionFailedf( + "system.span_configurations descriptor does not match key: %v", kv.Key, + ) + } + if err := startKeyRow[0].EnsureDecoded(types[0], &sd.alloc); err != nil { + return roachpb.SpanConfigEntry{}, err + } + entry.Span.Key = []byte(tree.MustBeDBytes(startKeyRow[0].Datum)) + } + if !kv.Value.IsPresent() { + return roachpb.SpanConfigEntry{}, + errors.AssertionFailedf("missing value for start key: %s", entry.Span.Key) + } + + // The remaining columns are stored as a family, packed with diff-encoded + // column IDs followed by their values. + { + bytes, err := kv.Value.GetTuple() + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + var colIDDiff uint32 + var lastColID descpb.ColumnID + var res tree.Datum + for len(bytes) > 0 { + _, _, colIDDiff, _, err = encoding.DecodeValueTag(bytes) + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + colID := lastColID + descpb.ColumnID(colIDDiff) + lastColID = colID + if idx, ok := sd.colIdxMap.Get(colID); ok { + res, bytes, err = rowenc.DecodeTableValue(&sd.alloc, tbl.PublicColumns()[idx].GetType(), bytes) + if err != nil { + return roachpb.SpanConfigEntry{}, err + } + + switch colID { + case tbl.PublicColumns()[1].GetID(): // end_key + entry.Span.EndKey = []byte(tree.MustBeDBytes(res)) + case tbl.PublicColumns()[2].GetID(): // config + if err := protoutil.Unmarshal([]byte(tree.MustBeDBytes(res)), &entry.Config); err != nil { + return roachpb.SpanConfigEntry{}, err + } + default: + return roachpb.SpanConfigEntry{}, errors.AssertionFailedf("unknown column: %v", colID) + } + } + } + } + + return entry, nil +} + +// TestingDecoderFn exports the decoding routine for testing purposes. +func TestingDecoderFn() func(roachpb.KeyValue) (roachpb.SpanConfigEntry, error) { + return newSpanConfigDecoder().decode +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder_test.go b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder_test.go new file mode 100644 index 000000000000..18e56e4b704a --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder_test.go @@ -0,0 +1,128 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" +) + +// TestSpanConfigDecoder verifies that we can decode rows stored in the +// system.span_configurations table. +func TestSpanConfigDecoder(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + const dummyTableName = "dummy_span_configurations" + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(t, fmt.Sprintf( + `SELECT table_id FROM crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + getCount := func() int { + q := tdb.Query(t, fmt.Sprintf(`SELECT count(*) FROM %s`, dummyTableName)) + q.Next() + var c int + require.Nil(t, q.Scan(&c)) + require.Nil(t, q.Close()) + return c + } + initialCount := getCount() + + key := tc.ScratchRange(t) + rng := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(key)) + span := rng.Desc().RSpan().AsRawSpanWithNoLocals() + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + + buf, err := protoutil.Marshal(&conf) + require.NoError(t, err) + tdb.Exec(t, fmt.Sprintf(`UPSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`, + dummyTableName), span.Key, span.EndKey, buf) + require.Equal(t, initialCount+1, getCount()) + + k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID) + rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + require.Len(t, rows, initialCount+1) + + last := rows[len(rows)-1] + got, err := spanconfigkvsubscriber.TestingDecoderFn()( + roachpb.KeyValue{ + Key: last.Key, + Value: *last.Value, + }, + ) + require.NoError(t, err) + require.Truef(t, span.Equal(got.Span), + "expected span=%s, got span=%s", span, got.Span) + require.Truef(t, conf.Equal(got.Config), + "expected config=%s, got config=%s", conf, got.Config) +} + +func BenchmarkSpanConfigDecoder(b *testing.B) { + defer log.Scope(b).Close(b) + + s, db, _ := serverutils.StartServer( + b, base.TestServerArgs{UseDatabase: "bench"}) + defer s.Stopper().Stop(context.Background()) + + ctx := context.Background() + const dummyTableName = "dummy_span_configurations" + tdb := sqlutils.MakeSQLRunner(db) + + tdb.Exec(b, `CREATE DATABASE bench`) + tdb.Exec(b, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + tdb.QueryRow(b, fmt.Sprintf( + `SELECT table_id from crdb_internal.tables WHERE name = '%s'`, dummyTableName), + ).Scan(&dummyTableID) + + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + buf, err := protoutil.Marshal(&conf) + require.NoError(b, err) + + tdb.Exec(b, fmt.Sprintf(`UPSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`, + dummyTableName), roachpb.Key("a"), roachpb.Key("b"), buf) + + k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID) + rows, err := s.DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(b, err) + last := rows[len(rows)-1] + decoderFn := spanconfigkvsubscriber.TestingDecoderFn() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = decoderFn(roachpb.KeyValue{ + Key: last.Key, + Value: *last.Value, + }) + } +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/testdata/basic b/pkg/spanconfig/spanconfigkvsubscriber/testdata/basic new file mode 100644 index 000000000000..676a8d5aa502 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/testdata/basic @@ -0,0 +1,48 @@ +# Test the basic control flow: writes to the span configurations table should +# appear to the subscriber. Incremental updates should as well, and the exposed +# store reader reflecting said updates appropriately. + +start +---- + +update +upsert [a,c):A +upsert [d,f):D +---- + +get +span [a,f) +---- +[a,c):A +[d,f):D + +updates +---- +/M{in-ax} +[a,c) +[d,f) + +store-reader key=a +---- +conf=A + +store-reader key=d +---- +conf=D + +store-reader compute-split=[a,e) +---- +d + +update +delete [d,f) +---- + +updates +---- +/M{in-ax} +[d,f) + +store-reader key=d +---- +conf=MISSING diff --git a/pkg/spanconfig/spanconfigkvsubscriber/testdata/buffer_overflow b/pkg/spanconfig/spanconfigkvsubscriber/testdata/buffer_overflow new file mode 100644 index 000000000000..fa7787f911e7 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/testdata/buffer_overflow @@ -0,0 +1,83 @@ +# Test the behavior of the kvsubscriber in the presence of internal +# subscription errors. During errors, the store-reader should present a snapshot +# view of the state before the error occurred. It should also be safe to +# bounce the same subscriber and have the handlers observe a [max,min) update +# indicating their view of all span configs needs to be refreshed. When +# consulting the store-reader after, it should observe a more up-to-date +# snapshot than earlier. We should also continue to observe incremental updates +# there-on-forth. + +start +---- + +update +upsert [a,c):A +upsert [d,f):D +---- + +updates +---- +/M{in-ax} +[a,c) +[d,f) + +store-reader key=a +---- +conf=A + +store-reader key=d +---- +conf=D + +# Inject a hard error. Subsequent updates aren't observed by the subscriber. The +# store-reader should also still be readable and present a snapshot of the state +# pre-error. +inject-buffer-overflow +---- + +update +upsert [a,c):B +delete [d,f) +---- + +updates +---- + +store-reader key=a +---- +conf=A + +store-reader key=d +---- +conf=D + +# Bounce the kvsubscriber. We should observe a catch-all update as a result, and +# observe a more up-to-date snapshot of the span configuration state. We should +# also receive incremental updates. +start +---- + +updates +---- +/M{in-ax} + +store-reader key=a +---- +conf=B + +store-reader key=d +---- +conf=MISSING + +update +upsert [a,c):C +---- + +updates +---- +/M{in-ax} +[a,c) + +store-reader key=a +---- +conf=C diff --git a/pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state b/pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state new file mode 100644 index 000000000000..5437421e235c --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state @@ -0,0 +1,27 @@ +# Ensure that subscribers started after certain span configs have been +# deleted/overwritten never observe earlier state. + +update +upsert [a,c):A +upsert [d,f):D +---- + +update +delete [d,f) +upsert [a,c):B +---- + +start +---- + +updates +---- +/M{in-ax} + +store-reader key=a +---- +conf=B + +store-reader key=d +---- +conf=MISSING diff --git a/pkg/spanconfig/spanconfigstore/BUILD.bazel b/pkg/spanconfig/spanconfigstore/BUILD.bazel index 5d6524640d87..a17a28eade92 100644 --- a/pkg/spanconfig/spanconfigstore/BUILD.bazel +++ b/pkg/spanconfig/spanconfigstore/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/roachpb:with-mocks", + "//pkg/settings", "//pkg/spanconfig", "//pkg/util/interval", "//pkg/util/log", diff --git a/pkg/spanconfig/spanconfigstore/shadow.go b/pkg/spanconfig/spanconfigstore/shadow.go index c9625f19fdeb..98a0b042318c 100644 --- a/pkg/spanconfig/spanconfigstore/shadow.go +++ b/pkg/spanconfig/spanconfigstore/shadow.go @@ -21,6 +21,10 @@ import ( // ShadowReader wraps around two spanconfig.StoreReaders and logs warnings (if // expensive logging is enabled) when there are divergent results from the two. +// +// TODO(irfansharif): This was added as a convenient way to diagnose the +// differences between span configs infrastructure and system config span. +// Remove it when we actually start issuing RPCs (#71994) have better tests. type ShadowReader struct { new, old spanconfig.StoreReader } diff --git a/pkg/spanconfig/spanconfigstore/store.go b/pkg/spanconfig/spanconfigstore/store.go index 0f3d357fd7c3..b5eb8bd50c5b 100644 --- a/pkg/spanconfig/spanconfigstore/store.go +++ b/pkg/spanconfig/spanconfigstore/store.go @@ -15,15 +15,27 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) +// EnabledSetting is a hidden cluster setting to enable the use of the span +// configs infrastructure in KV. It switches each store in the cluster from +// using the gossip backed system config span to instead using the span configs +// infrastructure. It has no effect unless COCKROACH_EXPERIMENTAL_SPAN_CONFIGS +// is set. +var EnabledSetting = settings.RegisterBoolSetting( + "spanconfig.experimental_store.enabled", + `use the span config infrastructure in KV instead of the system config span`, + false, +).WithSystemOnly() + // Store is an in-memory data structure to store and retrieve span configs. // Internally it makes use of an interval tree to store non-overlapping span -// configs. +// configs. It's safe for concurrent use. type Store struct { mu struct { syncutil.RWMutex diff --git a/pkg/spanconfig/testing_knobs.go b/pkg/spanconfig/testing_knobs.go index bf498332d19e..acde9d9f34b6 100644 --- a/pkg/spanconfig/testing_knobs.go +++ b/pkg/spanconfig/testing_knobs.go @@ -10,7 +10,10 @@ package spanconfig -import "github.com/cockroachdb/cockroach/pkg/base" +import ( + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) // TestingKnobs provide fine-grained control over the various span config // components for testing. @@ -31,6 +34,25 @@ type TestingKnobs struct { // manager has checked if the auto span config reconciliation job exists or // not. ManagerAfterCheckedReconciliationJobExistsInterceptor func(exists bool) + + // KVSubscriberPostRangefeedStartInterceptor is invoked after the rangefeed is started. + KVSubscriberPostRangefeedStartInterceptor func() + + // KVSubscriberPreExitInterceptor is invoked right before returning from + // subscribeInner, after tearing down internal components. + KVSubscriberPreExitInterceptor func() + + // KVSubscriberOnTimestampAdvanceInterceptor is invoked each time the + // KVSubscriber has process all updates before the provided timestamp. + KVSubscriberOnTimestampAdvanceInterceptor func(hlc.Timestamp) + + // KVSubscriberErrorInjectionCh is a way for tests to conveniently inject + // buffer overflow errors into the subscriber in order to test recovery. + KVSubscriberErrorInjectionCh chan error + + // StoreKVSubscriberOverride is used to override the KVSubscriber used when + // setting up a new store. + StoreKVSubscriberOverride KVSubscriber } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 3fd5dc5fdbb1..5ddafb6fd02b 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -2245,7 +2245,14 @@ var ( ColumnIDs: []descpb.ColumnID{1, 2, 3}, }, }, - pk("start_key"), + descpb.IndexDescriptor{ + Name: "primary", + ID: keys.SpanConfigurationsTablePrimaryKeyIndexID, + Unique: true, + KeyColumnNames: []string{"start_key"}, + KeyColumnDirections: singleASC, + KeyColumnIDs: singleID1, + }, ), func(tbl *descpb.TableDescriptor) { tbl.Checks = []*descpb.TableDescriptor_CheckConstraint{{