Skip to content

Commit

Permalink
spanconfig: introduce the spanconfig.KVSubscriber
Browse files Browse the repository at this point in the history
KVSubscriber presents a consistent[^1] snapshot of a
spanconfig.StoreReader that's incrementally maintained with changes made
to the global span configurations state. The maintenance happens
transparently; callers can subscribe to learn about what key spans may
have seen a configuration change. After learning about a span update,
consulting the embedded StoreReader would retrieve an up-to-date[^2]
config for it.

When a callback is first installed, it's invoked with the [min,max) span
-- a shorthand to indicate that subscribers should consult the
StoreReader for all spans of interest. Subsequent updates are of the
more incremental kind. It's possible that the span updates received are
no-ops, i.e.  consulting the StoreReader for the given span would
retrieve the last config observed for the span[^2].

    type KVSubscriber interface {
      StoreReader
      Subscribe(func(updated roachpb.Span))
    }

It's expected to Start-ed once, after which one or many subscribers can
listen in for updates. Internally we maintain a rangefeed over the
global store of span configurations (system.span_configurations),
applying updates from it into an embedded spanconfig.Store. A read-only
view of this data structure (spanconfig.StoreReader) is exposed as part
of the KVSubscriber interface. Rangefeeds used as is don't offer any
ordering guarantees with respect to updates made over non-overlapping
keys, which is something we care about[^4]. For that reason we make use
of a rangefeed buffer, accumulating raw rangefeed updates and flushing
them out en-masse in timestamp order when the rangefeed frontier is
bumped[^5]. If the buffer overflows (as dictated by the memory limit the
KVSubscriber is instantiated with), the old rangefeed is wound down and
a new one re-established.

When running into the internal errors described above, it's safe for us
to re-establish the underlying rangefeeds. When re-establishing a new
rangefeed and populating a spanconfig.Store using the contents of the
initial scan[3], we wish to preserve the existing
spanconfig.StoreReader. Discarding it would entail either blocking all
external readers until a new spanconfig.StoreReader was fully populated,
or presenting an inconsistent view of the spanconfig.Store that's
currently being populated. For new rangefeeds what we do then is route
all updates from the initial scan to a fresh spanconfig.Store, and once
the initial scan is done, swap at the source for the exported
spanconfig.StoreReader. During the initial scan, concurrent readers
would continue to observe the last spanconfig.StoreReader if any.  After
the swap, it would observe the more up-to-date source instead. Future
incremental updates will also target the new source. When this source
swap occurs, we inform the handler of the need to possibly refresh its
view of all configs.

This commit also wires up the KVSubscriber into KV stores, replacing the
use of the gossiped system config span (possible given the StoreReader
interface, only happens if a testing flag/env var is set).

[^1]: The contents of the StoreReader at t1 corresponds exactly to the
      contents of the global span configuration state at t0 where
      t0 <= t1. If the StoreReader is read from at t2 where t2 > t1,
      it's guaranteed to observe a view of the global state at t >= t0.
[^2]: For the canonical KVSubscriber implementation, this is typically
      the closed timestamp target duration.
[^3]: The canonical KVSubscriber implementation internally
      re-establishes feeds when errors occur, possibly re-transmitting
      earlier updates (usually through a lazy [min,max) span) despite
      possibly not needing to. We could do a bit better and diff the two
      data structures, emitting only targeted updates.
[^4]: For a given key k, it's config may be stored as part of a larger
      span S (where S.start <= k < S.end). It's possible for S to get
      deleted and replaced with sub-spans S1...SN in the same
      transaction if the span is getting split. When applying these
      updates, we need to make sure to process the deletion event for S
      before processing S1...SN.
[^5]: In our example above deleting the config for S and adding configs
      for S1...SN, we want to make sure that we apply the full set of
      updates all at once -- lest we expose the intermediate state where
      the config for S was deleted but the configs for S1...SN were not
      yet applied.
[^6]: When tearing down the subscriber due to underlying errors, we
      could also surface a checkpoint to use the next time the
      subscriber is established. That way we can avoid the full initial
      scan over the span configuration state and simply pick up where we
      left off with our existing spanconfig.Store.

Release note: None
  • Loading branch information
irfansharif committed Nov 3, 2021
1 parent fde4604 commit 89f8aba
Show file tree
Hide file tree
Showing 27 changed files with 1,708 additions and 42 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ ALL_TESTS = [
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test",
"//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
Expand Down
9 changes: 5 additions & 4 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,11 @@ const (
ZonesTableConfigColumnID = 2
ZonesTableConfigColFamID = 2

DescriptorTablePrimaryKeyIndexID = 1
DescriptorTableDescriptorColID = 2
DescriptorTableDescriptorColFamID = 2
TenantsTablePrimaryKeyIndexID = 1
DescriptorTablePrimaryKeyIndexID = 1
DescriptorTableDescriptorColID = 2
DescriptorTableDescriptorColFamID = 2
TenantsTablePrimaryKeyIndexID = 1
SpanConfigurationsTablePrimaryKeyIndexID = 1

// Reserved IDs for other system tables. Note that some of these IDs refer
// to "Ranges" instead of a Table - these IDs are needed to store custom
Expand Down
3 changes: 3 additions & 0 deletions pkg/keys/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ package keys
import "github.com/cockroachdb/cockroach/pkg/roachpb"

var (
// EverythingSpan is a span that covers everything.
EverythingSpan = roachpb.Span{Key: roachpb.KeyMin, EndKey: roachpb.KeyMax}

// Meta1Span holds all first level addressing records.
Meta1Span = roachpb.Span{Key: roachpb.KeyMin, EndKey: Meta2Prefix}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ func (f *RangeFeed) Start(ctx context.Context) error {
return nil
}

// Close closes the RangeFeed and waits for it to shut down; it does
// idempotently. It's guaranteed that no future handlers will be invoked after
// this point.
// Close closes the RangeFeed and waits for it to shut down; it does so
// idempotently. It waits for the currently running handler, if any, to complete
// and guarantees that no future handlers will be invoked after this point.
func (f *RangeFeed) Close() {
f.closeOnce.Do(func() {
f.cancel()
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
Expand Down Expand Up @@ -218,6 +219,7 @@ go_test(
"client_replica_backpressure_test.go",
"client_replica_gc_test.go",
"client_replica_test.go",
"client_spanconfigs_test.go",
"client_split_burst_test.go",
"client_split_test.go",
"client_status_test.go",
Expand Down Expand Up @@ -340,6 +342,7 @@ go_test(
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand All @@ -349,6 +352,8 @@ go_test(
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
Expand Down
119 changes: 119 additions & 0 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestSpanConfigUpdateAppliedToReplica ensures that when a store learns of a
// span config update, it installs the corresponding config on the right
// replica.
func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
defer leaktest.AfterTest(t)()

spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig())
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)

ctx := context.Background()

args := base.TestServerArgs{
EnableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
},
SpanConfig: &spanconfig.TestingKnobs{
StoreKVSubscriberOverride: mockSubscriber,
},
},
}
s, _, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(context.Background())

_, err := s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", nil,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
`SET CLUSTER SETTING spanconfig.experimental_store.enabled = true`)
require.NoError(t, err)

key, err := s.ScratchRange()
require.NoError(t, err)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
repl := store.LookupReplica(keys.MustAddr(key))
span := repl.Desc().RSpan().AsRawSpanWithNoLocals()
conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3}

deleted, added := spanConfigStore.Apply(ctx, spanconfig.Update{Span: span, Config: conf}, false /* dryrun */)
require.Empty(t, deleted)
require.Len(t, added, 1)
require.True(t, added[0].Span.Equal(span))
require.True(t, added[0].Config.Equal(conf))

require.NotNil(t, mockSubscriber.callback)
mockSubscriber.callback(span) // invoke the callback
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(keys.MustAddr(key))
gotConfig := repl.SpanConfig()
if !gotConfig.Equal(conf) {
return errors.Newf("expected config=%s, got config=%s", conf.String(), gotConfig.String())
}
return nil
})
}

func newMockSpanConfigSubscriber(store spanconfig.Store) *mockSpanConfigSubscriber {
return &mockSpanConfigSubscriber{Store: store}
}

type mockSpanConfigSubscriber struct {
callback func(config roachpb.Span)
spanconfig.Store
}

func (m *mockSpanConfigSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool {
return m.Store.NeedsSplit(ctx, start, end)
}

func (m *mockSpanConfigSubscriber) ComputeSplitKey(
ctx context.Context, start, end roachpb.RKey,
) roachpb.RKey {
return m.Store.ComputeSplitKey(ctx, start, end)
}

func (m *mockSpanConfigSubscriber) GetSpanConfigForKey(
ctx context.Context, key roachpb.RKey,
) (roachpb.SpanConfig, error) {
return m.Store.GetSpanConfigForKey(ctx, key)
}

func (m *mockSpanConfigSubscriber) Subscribe(callback func(roachpb.Span)) {
m.callback = callback
}

var _ spanconfig.KVSubscriber = &mockSpanConfigSubscriber{}
Loading

0 comments on commit 89f8aba

Please sign in to comment.