From 0c9f64f315abee0e4baf98ebb5fd1fc408d5d994 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 16 Feb 2021 10:38:30 -0500 Subject: [PATCH 1/8] stop/stopper: use sync.Map for contexts to allow canceling contexts in close Before this commit, if a Closer called cancel on a context which was attached to the stopper, it would result in a deadlock as Close is called under the same mutex that that the cancel method was trying to lock. This exchanges that map for an atomic map. Release note: None --- pkg/util/stop/stopper.go | 31 ++++++++++++++++--------------- pkg/util/stop/stopper_test.go | 18 ++++++++++++++++++ 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index b09ba5d3a901..5f0816d703a0 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -15,6 +15,7 @@ import ( "fmt" "net/http" "runtime/debug" + "sync" "sync/atomic" "testing" "time" @@ -173,8 +174,11 @@ type Stopper struct { // should execute immediately. quiescing, stopping bool closers []Closer - idAlloc int // allocates index into qCancels - qCancels map[int]func() // ctx cancels to be called on Quiesce + + // idAlloc is incremented atomically under the read lock when adding a + // context to be canceled. + idAlloc int64 // allocates index into qCancels + qCancels sync.Map } } @@ -205,8 +209,6 @@ func NewStopper(options ...Option) *Stopper { stopped: make(chan struct{}), } - s.mu.qCancels = map[int]func(){} - for _, opt := range options { opt.apply(s) } @@ -272,20 +274,17 @@ func (s *Stopper) WithCancelOnQuiesce(ctx context.Context) (context.Context, fun func (s *Stopper) withCancel(ctx context.Context) (context.Context, func()) { var cancel func() ctx, cancel = context.WithCancel(ctx) - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() if s.refuseRLocked() { cancel() return ctx, func() {} } - id := s.mu.idAlloc - s.mu.idAlloc++ - s.mu.qCancels[id] = cancel + id := atomic.AddInt64(&s.mu.idAlloc, 1) + s.mu.qCancels.Store(id, cancel) return ctx, func() { cancel() - s.mu.Lock() - defer s.mu.Unlock() - delete(s.mu.qCancels, id) + s.mu.qCancels.Delete(id) } } @@ -516,10 +515,12 @@ func (s *Stopper) Quiesce(ctx context.Context) { close(s.quiescer) } - for _, cancel := range s.mu.qCancels { + s.mu.qCancels.Range(func(k, v interface{}) (wantMore bool) { + cancel := v.(func()) cancel() - } - s.mu.qCancels = nil + s.mu.qCancels.Delete(k) + return true + }) }() for s.NumTasks() > 0 { diff --git a/pkg/util/stop/stopper_test.go b/pkg/util/stop/stopper_test.go index a7f881bc6572..2dad70ae9b6b 100644 --- a/pkg/util/stop/stopper_test.go +++ b/pkg/util/stop/stopper_test.go @@ -637,3 +637,21 @@ func BenchmarkStopperPar(b *testing.B) { } }) } + +func TestCancelInCloser(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s := stop.NewStopper() + defer s.Stop(ctx) + + // This will call the Closer which will call cancel and should + // not deadlock. + _, cancel := s.WithCancelOnQuiesce(ctx) + s.AddCloser(closerFunc(cancel)) + s.Stop(ctx) +} + +// closerFunc implements Closer. +type closerFunc func() + +func (cf closerFunc) Close() { cf() } From a2b71d1941f35baf1fce852f074a0b19da1f1c5b Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 29 Dec 2020 00:29:40 -0500 Subject: [PATCH 2/8] kvclient/rangefeed: introduce package for using rangefeeds This commit introduces a new package underneath kvclient to simplify using rangefeeds. Namely, it provides sane retry and shutdown logic as well as support for performing an initial data scan. It also hides the nasty unwrapping of the DistSender as performed elsewhere. A longer-term vision here would be to support multiple spans and utilize this layer in the `kvfeed` of `changefeedccl`. Release note: None --- pkg/BUILD.bazel | 1 + pkg/kv/kvclient/rangefeed/BUILD.bazel | 56 +++ pkg/kv/kvclient/rangefeed/config.go | 88 +++++ pkg/kv/kvclient/rangefeed/db_adapter.go | 97 ++++++ .../rangefeed/db_adapter_external_test.go | 87 +++++ pkg/kv/kvclient/rangefeed/doc.go | 24 ++ pkg/kv/kvclient/rangefeed/helpers_test.go | 25 ++ pkg/kv/kvclient/rangefeed/main_test.go | 34 ++ pkg/kv/kvclient/rangefeed/rangefeed.go | 296 ++++++++++++++++ .../rangefeed/rangefeed_external_test.go | 108 ++++++ .../kvclient/rangefeed/rangefeed_mock_test.go | 328 ++++++++++++++++++ 11 files changed, 1144 insertions(+) create mode 100644 pkg/kv/kvclient/rangefeed/BUILD.bazel create mode 100644 pkg/kv/kvclient/rangefeed/config.go create mode 100644 pkg/kv/kvclient/rangefeed/db_adapter.go create mode 100644 pkg/kv/kvclient/rangefeed/db_adapter_external_test.go create mode 100644 pkg/kv/kvclient/rangefeed/doc.go create mode 100644 pkg/kv/kvclient/rangefeed/helpers_test.go create mode 100644 pkg/kv/kvclient/rangefeed/main_test.go create mode 100644 pkg/kv/kvclient/rangefeed/rangefeed.go create mode 100644 pkg/kv/kvclient/rangefeed/rangefeed_external_test.go create mode 100644 pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 7563a924ea27..16984357c280 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -78,6 +78,7 @@ ALL_TESTS = [ "//pkg/kv/bulk:bulk_test", "//pkg/kv/kvclient/kvcoord:kvcoord_test", "//pkg/kv/kvclient/rangecache:rangecache_test", + "//pkg/kv/kvclient/rangefeed:rangefeed_test", "//pkg/kv/kvnemesis:kvnemesis_test", "//pkg/kv/kvserver/abortspan:abortspan_test", "//pkg/kv/kvserver/apply:apply_test", diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel new file mode 100644 index 000000000000..f1e94ac54a84 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -0,0 +1,56 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "rangefeed", + srcs = [ + "config.go", + "db_adapter.go", + "doc.go", + "rangefeed.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", + "//pkg/roachpb", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/retry", + "//pkg/util/span", + "//pkg/util/stop", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", + ], +) + +go_test( + name = "rangefeed_test", + srcs = [ + "db_adapter_external_test.go", + "helpers_test.go", + "main_test.go", + "rangefeed_external_test.go", + "rangefeed_mock_test.go", + ], + embed = [":rangefeed"], + deps = [ + "//pkg/base", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/encoding", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/retry", + "//pkg/util/stop", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go new file mode 100644 index 000000000000..73ea4d522ff3 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -0,0 +1,88 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/retry" +) + +// Option configures a RangeFeed. +type Option interface { + set(*config) +} + +type config struct { + retryOptions retry.Options + onInitialScanDone OnInitialScanDone + withInitialScan bool + withDiff bool + onInitialScanError OnInitialScanError +} + +type optionFunc func(*config) + +func (o optionFunc) set(c *config) { o(c) } + +// OnInitialScanDone is called when an initial scan is finished before any rows +// from the rangefeed are supplied. +type OnInitialScanDone func(ctx context.Context) + +// OnInitialScanError is called when an initial scan encounters an error. It +// allows the caller to tell the RangeFeed to stop as opposed to retrying +// endlessly. +type OnInitialScanError func(ctx context.Context, err error) (shouldFail bool) + +// WithInitialScan enables an initial scan of the data in the span. The rows of +// an initial scan will be passed to the value function used to construct the +// RangeFeed. Upon completion of the initial scan, the passed function (if +// non-nil) will be called. The initial scan may be restarted and thus rows +// may be observed multiple times. The caller cannot rely on rows being returned +// in order. +func WithInitialScan(f OnInitialScanDone) Option { + return optionFunc(func(c *config) { + c.withInitialScan = true + c.onInitialScanDone = f + }) +} + +// WithOnInitialScanError sets up a callback to report errors during the initial +// scan to the caller. The caller may instruct the RangeFeed to halt rather than +// retrying endlessly. This option will not enable an initial scan; it must be +// used in conjunction with WithInitialScan to have any effect. +func WithOnInitialScanError(f OnInitialScanError) Option { + return optionFunc(func(c *config) { + c.onInitialScanError = f + }) +} + +// WithDiff makes an option to enable an initial scan which defaults to +// false. +func WithDiff() Option { + return optionFunc(func(c *config) { + c.withDiff = true + }) +} + +// WithRetry configures the retry options for the rangefeed. +func WithRetry(options retry.Options) Option { + return optionFunc(func(c *config) { + c.retryOptions = options + }) +} + +func initConfig(c *config, options []Option) { + *c = config{} // the default config is its zero value + for _, o := range options { + o.set(c) + } +} diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go new file mode 100644 index 000000000000..5e1cbfaad504 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -0,0 +1,97 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// dbAdapter is an implementation of the kvDB interface using a real *kv.DB. +type dbAdapter struct { + db *kv.DB + distSender *kvcoord.DistSender + targetScanBytes int64 +} + +var _ kvDB = (*dbAdapter)(nil) + +// TODO(ajwerner): Hook up a memory monitor. Fortunately most users of the +// initial scan are reading scant amounts of data. + +// defaultTargetScanBytes was pulled out of thin air. The main reason is that +// this thing is not hooked up to a memory monitor. +const defaultTargetScanBytes = 1 << 19 // 512 KiB + +// newDBAdapter construct a kvDB using a *kv.DB. +func newDBAdapter(db *kv.DB) (*dbAdapter, error) { + var distSender *kvcoord.DistSender + { + txnWrapperSender, ok := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender) + if !ok { + return nil, errors.Errorf("failed to extract a %T from %T", + (*kv.CrossRangeTxnWrapperSender)(nil), db.NonTransactionalSender()) + } + distSender, ok = txnWrapperSender.Wrapped().(*kvcoord.DistSender) + if !ok { + return nil, errors.Errorf("failed to extract a %T from %T", + (*kvcoord.DistSender)(nil), txnWrapperSender.Wrapped()) + } + } + return &dbAdapter{ + db: db, + distSender: distSender, + targetScanBytes: defaultTargetScanBytes, + }, nil +} + +// RangeFeed is part of the kvDB interface. +func (dbc *dbAdapter) RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, +) error { + return dbc.distSender.RangeFeed(ctx, span, startFrom, withDiff, eventC) +} + +// Scan is part of the kvDB interface. +func (dbc *dbAdapter) Scan( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), +) error { + return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetFixedTimestamp(ctx, asOf) + sp := span + var b kv.Batch + for { + b.Header.TargetBytes = dbc.targetScanBytes + b.Scan(sp.Key, sp.EndKey) + if err := txn.Run(ctx, &b); err != nil { + return err + } + res := b.Results[0] + for _, row := range res.Rows { + rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value}) + } + if res.ResumeSpan == nil { + return nil + } + sp = res.ResumeSpanAsValue() + b = kv.Batch{} + } + }) +} diff --git a/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go new file mode 100644 index 000000000000..69c912089912 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go @@ -0,0 +1,87 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestDBClientScan tests that the logic in Scan on the dbAdapter is sane. +// The rangefeed logic is a literal passthrough so it's not getting a lot of +// testing directly. +func TestDBClientScan(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + beforeAny := db.Clock().Now() + scratchKey := tc.ScratchRange(t) + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + require.NoError(t, db.Put(ctx, mkKey("b"), 2)) + afterB := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("c"), 3)) + + dba, err := rangefeed.NewDBAdapter(db) + require.NoError(t, err) + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + + // Ensure that the timestamps are properly respected by not observing any + // values at the timestamp preceding writes. + { + var responses []roachpb.KeyValue + require.NoError(t, dba.Scan(ctx, sp, beforeAny, func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 0) + } + + // Ensure that expected values are seen at the intermediate timestamp. + { + var responses []roachpb.KeyValue + require.NoError(t, dba.Scan(ctx, sp, afterB, func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 2) + require.Equal(t, mkKey("a"), responses[0].Key) + va, err := responses[0].Value.GetInt() + require.NoError(t, err) + require.Equal(t, int64(1), va) + } + + // Ensure that pagination doesn't break anything. + dba.SetTargetScanBytes(1) + { + var responses []roachpb.KeyValue + require.NoError(t, dba.Scan(ctx, sp, db.Clock().Now(), func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 3) + } + +} diff --git a/pkg/kv/kvclient/rangefeed/doc.go b/pkg/kv/kvclient/rangefeed/doc.go new file mode 100644 index 000000000000..59d40f13329a --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/doc.go @@ -0,0 +1,24 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package rangefeed provides a useful client abstraction atop of the rangefeed +// functionality exported by the DistSender. +// +// In particular, the abstraction exported by this package hooks up a stopper, +// and deals with retries upon errors, tracking resolved timestamps along the +// way. +package rangefeed + +// TODO(ajwerner): Rework this logic to encapsulate the multi-span logic in +// changefeedccl/kvfeed. That code also deals with some schema interactions but +// it should be split into two layers. The primary limitation missing here is +// just the ability to watch multiple spans however the way that the KV feed +// manages internal state and sometimes triggers re-scanning would require some +// interface changes. diff --git a/pkg/kv/kvclient/rangefeed/helpers_test.go b/pkg/kv/kvclient/rangefeed/helpers_test.go new file mode 100644 index 000000000000..51ec80ded20b --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/helpers_test.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +// NewDBAdapter allows tests to construct a dbAdapter. +var NewDBAdapter = newDBAdapter + +// NewFactoryWithDB allows tests to construct a factory with an injected db. +var NewFactoryWithDB = newFactory + +// KVDB forwards the definition of kvDB to tests. +type KVDB = kvDB + +// SetTargetScanBytes is exposed for testing. +func (dbc *dbAdapter) SetTargetScanBytes(limit int64) { + dbc.targetScanBytes = limit +} diff --git a/pkg/kv/kvclient/rangefeed/main_test.go b/pkg/kv/kvclient/rangefeed/main_test.go new file mode 100644 index 000000000000..1e7e15018cfc --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func init() { + security.SetAssetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go new file mode 100644 index 000000000000..2433d96f4213 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -0,0 +1,296 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/logtags" +) + +// TODO(ajwerner): Expose hooks for metrics. +// TODO(ajwerner): Expose access to checkpoints and the frontier. +// TODO(ajwerner): Expose better control over how the retrier gets reset. + +// kvDB is an adapter to the underlying KV store. +type kvDB interface { + + // RangeFeed runs a rangefeed on a given span with the given arguments. + // It encapsulates the RangeFeed method on roachpb.Internal. + RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, + ) error + + // Scan encapsulates scanning a keyspan at a given point in time. The method + // deals with pagination, calling the caller back for each row. Note that + // the API does not require that the rows be ordered to allow for future + // parallelism. + Scan( + ctx context.Context, + span roachpb.Span, + asOf hlc.Timestamp, + rowFn func(value roachpb.KeyValue), + ) error +} + +// Factory is used to construct RangeFeeds. +type Factory struct { + stopper *stop.Stopper + client kvDB +} + +// NewFactory constructs a new Factory. +func NewFactory(stopper *stop.Stopper, db *kv.DB) (*Factory, error) { + kvDB, err := newDBAdapter(db) + if err != nil { + return nil, err + } + return newFactory(stopper, kvDB), nil +} + +func newFactory(stopper *stop.Stopper, client kvDB) *Factory { + return &Factory{ + stopper: stopper, + client: client, + } +} + +// RangeFeed constructs a new RangeFeed. The only error which can be returned +// will indicate that the server is being shut down. +func (f *Factory) RangeFeed( + ctx context.Context, + name string, + span roachpb.Span, + initialTimestamp hlc.Timestamp, + onValue func(ctx context.Context, value *roachpb.RangeFeedValue), + options ...Option, +) (_ *RangeFeed, err error) { + r := RangeFeed{ + client: f.client, + stopper: f.stopper, + + initialTimestamp: initialTimestamp, + span: span, + onValue: onValue, + + stopped: make(chan struct{}), + } + initConfig(&r.config, options) + ctx = logtags.AddTag(ctx, "rangefeed", name) + ctx, r.cancel = f.stopper.WithCancelOnQuiesce(ctx) + if err := f.stopper.RunAsyncTask(ctx, "rangefeed", r.run); err != nil { + r.cancel() + return nil, err + } + return &r, nil +} + +// RangeFeed represents a running RangeFeed. +type RangeFeed struct { + config + client kvDB + stopper *stop.Stopper + + initialTimestamp hlc.Timestamp + + span roachpb.Span + onValue func(ctx context.Context, value *roachpb.RangeFeedValue) + + closeOnce sync.Once + cancel context.CancelFunc + stopped chan struct{} +} + +// Close closes the RangeFeed and waits for it to shut down. +// Close is idempotent. +func (f *RangeFeed) Close() { + f.closeOnce.Do(func() { + f.cancel() + <-f.stopped + }) +} + +// Run the rangefeed in a loop in the case of failure, likely due to node +// failures or general unavailability. If the rangefeed runs successfully for at +// least this long, then after subsequent failures we would like to reset the +// exponential backoff to experience long delays between retry attempts. +// This is the threshold of successful running after which the backoff state +// will be reset. +const resetThreshold = 30 * time.Second + +// run will run the RangeFeed until the context is canceled or if the client +// indicates that an initial scan error is non-recoverable. +func (f *RangeFeed) run(ctx context.Context) { + defer close(f.stopped) + r := retry.StartWithCtx(ctx, f.retryOptions) + restartLogEvery := log.Every(10 * time.Second) + if done := f.maybeRunInitialScan(ctx, &restartLogEvery, &r); done { + return + } + + // Check the context before kicking off a rangefeed. + if ctx.Err() != nil { + return + } + + // TODO(ajwerner): Consider adding event buffering. Doing so would require + // draining when the rangefeed fails. + eventCh := make(chan *roachpb.RangeFeedEvent) + errCh := make(chan error) + + // Maintain a frontier in order to resume at a reasonable timestamp. + // TODO(ajwerner): Consider exposing the frontier through a RangeFeed method. + // Doing so would require some synchronization. + frontier := span.MakeFrontier(f.span) + frontier.Forward(f.span, f.initialTimestamp) + for i := 0; r.Next(); i++ { + + // TODO(ajwerner): Figure out what to do if the rangefeed falls behind to + // a point where the frontier timestamp precedes the GC threshold and thus + // will never work. Perhaps an initial scan could be performed again for + // some users. The API currently doesn't make that easy. Perhaps a callback + // should be called in order to allow the client to kill the process or + // something like that. + ts := frontier.Frontier() + log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, f.span) + start := timeutil.Now() + + // Note that the below channel send will not block forever because + // processEvents will wait for the worker to send. RunWorker is safe here + // because processEvents is guaranteed to consume the error before + // returning. + if err := f.stopper.RunAsyncTask(ctx, "rangefeed", func(ctx context.Context) { + errCh <- f.client.RangeFeed(ctx, f.span, ts, f.withDiff, eventCh) + }); err != nil { + log.VEventf(ctx, 1, "exiting rangefeed due to stopper") + return + } + + err := f.processEvents(ctx, frontier, eventCh, errCh) + if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() { + log.Warningf(ctx, "rangefeed failed %d times, restarting: %v", + log.Safe(i), err) + } + if ctx.Err() != nil { + log.VEventf(ctx, 1, "exiting rangefeed") + return + } + + ranFor := timeutil.Since(start) + log.VEventf(ctx, 1, "restarting rangefeed for %v after %v", + log.Safe(f.span), ranFor) + + // If the rangefeed ran successfully for long enough, reset the retry + // state so that the exponential backoff begins from its minimum value. + if ranFor > resetThreshold { + i = 1 + r.Reset() + } + } +} + +// maybeRunInitialScan will attempt to perform an initial data scan if one was +// requested. It will retry in the face of errors and will only return upon +// success, context cancellation, or an error handling function which indicates +// that an error is unrecoverable. The return value will be true if the context +// was canceled or if the OnInitialScanError function indicated that the +// RangeFeed should stop. +func (f *RangeFeed) maybeRunInitialScan( + ctx context.Context, n *log.EveryN, r *retry.Retry, +) (canceled bool) { + if !f.withInitialScan { + return false // canceled + } + scan := func(kv roachpb.KeyValue) { + v := roachpb.RangeFeedValue{ + Key: kv.Key, + Value: kv.Value, + } + + // Mark the data as occurring at the initial timestamp, which is the + // timestamp at which it was read. + v.Value.Timestamp = f.initialTimestamp + + // Supply the value from the scan as also the previous value to avoid + // indicating that the value was previously deleted. + if f.withDiff { + v.PrevValue = v.Value + } + + // It's something of a bummer that we must allocate a new value for each + // of these but the contract doesn't indicate that the value cannot be + // retained so we have to assume that the callback may retain the value. + f.onValue(ctx, &v) + } + for r.Next() { + if err := f.client.Scan(ctx, f.span, f.initialTimestamp, scan); err != nil { + if f.onInitialScanError != nil { + if shouldStop := f.onInitialScanError(ctx, err); shouldStop { + log.VEventf(ctx, 1, "stopping due to error: %v", err) + return true + } + } + if n.ShouldLog() { + log.Warningf(ctx, "failed to perform initial scan: %v", err) + } + } else /* err == nil */ { + if f.onInitialScanDone != nil { + f.onInitialScanDone(ctx) + } + break + } + } + return ctx.Err() != nil // canceled +} + +// processEvents processes events sent by the rangefeed on the eventCh. It waits +// for the rangefeed to signal that it has exited by sending on errCh. +func (f *RangeFeed) processEvents( + ctx context.Context, + frontier *span.Frontier, + eventCh <-chan *roachpb.RangeFeedEvent, + errCh <-chan error, +) error { + for { + select { + case ev := <-eventCh: + switch { + case ev.Val != nil: + f.onValue(ctx, ev.Val) + case ev.Checkpoint != nil: + frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS) + case ev.Error != nil: + // Intentionally do nothing, we'll get an error returned from the + // call to RangeFeed. + } + case <-ctx.Done(): + // Ensure that the RangeFeed goroutine stops. + <-errCh + return ctx.Err() + case err := <-errCh: + return err + } + } +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go new file mode 100644 index 000000000000..ba661558b54a --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -0,0 +1,108 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestRangeFeedIntegration is a basic integration test demonstrating all of +// the pieces working together. +func TestRangeFeedIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + scratchKey := tc.ScratchRange(t) + scratchKey = scratchKey[:len(scratchKey):len(scratchKey)] + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + // Split the range a bunch of times. + const splits = 10 + for i := 0; i < splits; i++ { + _, _, err := tc.SplitRange(mkKey(string([]byte{'a' + byte(i)}))) + require.NoError(t, err) + } + + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + require.NoError(t, db.Put(ctx, mkKey("b"), 2)) + afterB := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("c"), 3)) + + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + { + // Enable rangefeeds, otherwise the thing will retry until they are enabled. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + } + + f, err := rangefeed.NewFactory(tc.Stopper(), db) + require.NoError(t, err) + rows := make(chan *roachpb.RangeFeedValue) + initialScanDone := make(chan struct{}) + r, err := f.RangeFeed(ctx, "test", sp, afterB, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + select { + case rows <- value: + case <-ctx.Done(): + } + }, rangefeed.WithDiff(), rangefeed.WithInitialScan(func(ctx context.Context) { + close(initialScanDone) + })) + require.NoError(t, err) + defer r.Close() + { + v1 := <-rows + require.Equal(t, mkKey("a"), v1.Key) + // Ensure the initial scan contract is fulfilled when WithDiff is specified. + require.Equal(t, v1.Value, v1.PrevValue) + require.Equal(t, v1.Value.Timestamp, afterB) + } + { + v2 := <-rows + require.Equal(t, mkKey("b"), v2.Key) + } + <-initialScanDone + { + v3 := <-rows + require.Equal(t, mkKey("c"), v3.Key) + } + + // Write a new value for "a" and make sure it is seen. + require.NoError(t, db.Put(ctx, mkKey("a"), 4)) + { + v4 := <-rows + require.Equal(t, mkKey("a"), v4.Key) + prev, err := v4.PrevValue.GetInt() + require.NoError(t, err) + require.Equal(t, int64(1), prev) + updated, err := v4.Value.GetInt() + require.NoError(t, err) + require.Equal(t, int64(4), updated) + } +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go new file mode 100644 index 000000000000..12d764f86fb2 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -0,0 +1,328 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockClient struct { + rangefeed func( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, + ) error + + scan func( + ctx context.Context, + span roachpb.Span, + asOf hlc.Timestamp, + rowFn func(value roachpb.KeyValue), + ) error +} + +func (m *mockClient) RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, +) error { + return m.rangefeed(ctx, span, startFrom, withDiff, eventC) +} + +func (m *mockClient) Scan( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), +) error { + return m.scan(ctx, span, asOf, rowFn) +} + +var _ (rangefeed.KVDB) = (*mockClient)(nil) + +// TestRangefeedMock utilizes the kvDB interface to test the behavior of the +// RangeFeed. +func TestRangeFeedMock(t *testing.T) { + defer leaktest.AfterTest(t)() + shortRetryOptions := retry.Options{ + InitialBackoff: time.Millisecond, + MaxBackoff: 2 * time.Millisecond, + } + t.Run("scan retries", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + ctx, cancel := context.WithCancel(ctx) + var i int + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("b"), + } + ts := hlc.Timestamp{WallTime: 1} + row := roachpb.KeyValue{ + Key: sp.Key, + Value: roachpb.Value{}, + } + const numFailures = 2 + mc := mockClient{ + scan: func(ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue)) error { + assert.Equal(t, ts, asOf) + assert.Equal(t, sp, span) + rowFn(row) + if i++; i <= numFailures { + return errors.New("boom") + } + // Ensure the rangefeed doesn't start up by canceling the context prior + // to concluding the scan. + cancel() + return nil + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + require.NotNil(t, f) + rows := make(chan *roachpb.RangeFeedValue) + + r, err := f.RangeFeed(ctx, "foo", sp, ts, func(ctx context.Context, value *roachpb.RangeFeedValue) { + rows <- value + }, rangefeed.WithInitialScan(func(ctx context.Context) { + close(rows) + }), rangefeed.WithRetry(shortRetryOptions)) + require.NoError(t, err) + require.NotNil(t, r) + for i := 0; i < numFailures+1; i++ { + r, ok := <-rows + require.Equal(t, row.Key, r.Key) + require.True(t, ok) + } + _, ok := <-rows + require.False(t, ok) + r.Close() + }) + t.Run("changefeed retries", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + initialTS := hlc.Timestamp{WallTime: 1} + nextTS := initialTS.Next() + lastTS := nextTS.Next() + row := roachpb.KeyValue{ + Key: sp.Key, + Value: roachpb.Value{}, + } + const ( + numRestartsBeforeCheckpoint = 3 + firstPartialCheckpoint = numRestartsBeforeCheckpoint + 1 + secondPartialCheckpoint = firstPartialCheckpoint + 1 + fullCheckpoint = secondPartialCheckpoint + 1 + lastEvent = fullCheckpoint + 1 + totalRestarts = lastEvent - 1 + ) + var iteration int + var gotToTheEnd bool + mc := mockClient{ + scan: func( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), + ) error { + t.Error("this should not be called") + return nil + }, + rangefeed: func( + ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ) error { + assert.False(t, withDiff) // it was not set + sendEvent := func(ts hlc.Timestamp) { + eventC <- &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + } + } + iteration++ + switch { + case iteration <= numRestartsBeforeCheckpoint: + sendEvent(initialTS) + assert.Equal(t, startFrom, initialTS) + return errors.New("boom") + case iteration == firstPartialCheckpoint: + assert.Equal(t, startFrom, initialTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: sp.Key, + EndKey: sp.Key.PrefixEnd(), + }, + ResolvedTS: nextTS, + }, + } + sendEvent(initialTS) + return errors.New("boom") + case iteration == secondPartialCheckpoint: + assert.Equal(t, startFrom, initialTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: sp.Key.PrefixEnd(), + EndKey: sp.EndKey, + }, + ResolvedTS: nextTS, + }, + } + sendEvent(nextTS) + return errors.New("boom") + case iteration == fullCheckpoint: + // At this point the frontier should have a complete checkpoint at + // nextTS. + assert.Equal(t, startFrom, nextTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: sp, + ResolvedTS: lastTS, + }, + } + sendEvent(nextTS) + return errors.New("boom") + case iteration == lastEvent: + // Send a last event. + sendEvent(lastTS) + gotToTheEnd = true + <-ctx.Done() + return ctx.Err() + default: + panic(iteration) + } + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "foo", sp, initialTS, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + rows <- value + }, rangefeed.WithRetry(shortRetryOptions)) + require.NoError(t, err) + require.NotNil(t, r) + start := timeutil.Now() + for i := 0; i < lastEvent; i++ { + r := <-rows + assert.Equal(t, row.Key, r.Key) + } + minimumBackoff := 850 * time.Microsecond // initialBackoff less jitter + totalBackoff := timeutil.Since(start) + require.Greater(t, totalBackoff.Nanoseconds(), (totalRestarts * minimumBackoff).Nanoseconds()) + r.Close() + require.True(t, gotToTheEnd) + }) + t.Run("withDiff", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + mc := mockClient{ + scan: func( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), + ) error { + t.Error("this should not be called") + return nil + }, + rangefeed: func( + ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ) error { + assert.True(t, withDiff) + eventC <- &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + } + <-ctx.Done() + return ctx.Err() + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + rows <- value + }, rangefeed.WithDiff()) + require.NoError(t, err) + <-rows + r.Close() + }) + t.Run("stopper already stopped", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + stopper.Stop(ctx) + f := rangefeed.NewFactoryWithDB(stopper, &mockClient{}) + r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + }) + require.Nil(t, r) + require.True(t, errors.Is(err, stop.ErrUnavailable), "%v", err) + }) + t.Run("initial scan error", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + var called int + f := rangefeed.NewFactoryWithDB(stopper, &mockClient{ + scan: func(ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue)) error { + return errors.New("boom") + }, + }, nil /* knobs */) + done := make(chan struct{}) + r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + }, + rangefeed.WithInitialScan(nil), + rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) { + if called++; called <= 1 { + close(done) + return false + } + return true + })) + require.NotNil(t, r) + require.NoError(t, err) + <-done + r.Close() + }) +} From 3cda1c7113556f07df9beefb9c016e5c5d7422f0 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 29 Dec 2020 11:07:29 -0500 Subject: [PATCH 3/8] server,lease: adopt new rangefeed package Release note: None --- pkg/base/testing_knobs.go | 1 + pkg/kv/kvclient/rangefeed/BUILD.bazel | 4 + pkg/kv/kvclient/rangefeed/rangefeed.go | 26 +++++- .../rangefeed/rangefeed_external_test.go | 2 +- .../kvclient/rangefeed/rangefeed_mock_test.go | 64 ++++++++++++- pkg/server/BUILD.bazel | 1 + pkg/server/server.go | 8 ++ pkg/server/server_sql.go | 6 ++ pkg/server/testserver.go | 7 ++ pkg/sql/BUILD.bazel | 2 + pkg/sql/catalog/lease/BUILD.bazel | 5 +- pkg/sql/catalog/lease/lease.go | 92 ++++--------------- pkg/sql/catalog/lease/lease_test.go | 72 +-------------- pkg/sql/exec_util.go | 3 + pkg/sql/schema_changer_test.go | 4 + 15 files changed, 140 insertions(+), 157 deletions(-) diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 04a62ea6ae01..541827910bba 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -21,6 +21,7 @@ type ModuleTestingKnobs interface { type TestingKnobs struct { Store ModuleTestingKnobs KVClient ModuleTestingKnobs + RangeFeed ModuleTestingKnobs SQLExecutor ModuleTestingKnobs SQLLeaseManager ModuleTestingKnobs SQLSchemaChanger ModuleTestingKnobs diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index f1e94ac54a84..3ce86fc60db6 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/roachpb", @@ -38,9 +39,11 @@ go_test( deps = [ "//pkg/base", "//pkg/roachpb", + "//pkg/rpc", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", "//pkg/util/encoding", @@ -52,5 +55,6 @@ go_test( "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 2433d96f4213..a612892105fb 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -59,21 +60,35 @@ type kvDB interface { type Factory struct { stopper *stop.Stopper client kvDB + knobs *TestingKnobs } +// TestingKnobs is used to inject behavior into a rangefeed for testing. +type TestingKnobs struct { + + // OnRangefeedRestart is called when a rangefeed restarts. + OnRangefeedRestart func() +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t TestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) + // NewFactory constructs a new Factory. -func NewFactory(stopper *stop.Stopper, db *kv.DB) (*Factory, error) { +func NewFactory(stopper *stop.Stopper, db *kv.DB, knobs *TestingKnobs) (*Factory, error) { kvDB, err := newDBAdapter(db) if err != nil { return nil, err } - return newFactory(stopper, kvDB), nil + return newFactory(stopper, kvDB, knobs), nil } -func newFactory(stopper *stop.Stopper, client kvDB) *Factory { +func newFactory(stopper *stop.Stopper, client kvDB, knobs *TestingKnobs) *Factory { return &Factory{ stopper: stopper, client: client, + knobs: knobs, } } @@ -90,6 +105,7 @@ func (f *Factory) RangeFeed( r := RangeFeed{ client: f.client, stopper: f.stopper, + knobs: f.knobs, initialTimestamp: initialTimestamp, span: span, @@ -112,6 +128,7 @@ type RangeFeed struct { config client kvDB stopper *stop.Stopper + knobs *TestingKnobs initialTimestamp hlc.Timestamp @@ -201,6 +218,9 @@ func (f *RangeFeed) run(ctx context.Context) { ranFor := timeutil.Since(start) log.VEventf(ctx, 1, "restarting rangefeed for %v after %v", log.Safe(f.span), ranFor) + if f.knobs != nil && f.knobs.OnRangefeedRestart != nil { + f.knobs.OnRangefeedRestart() + } // If the rangefeed ran successfully for long enough, reset the retry // state so that the exponential backoff begins from its minimum value. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index ba661558b54a..9b130b8374d8 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -60,7 +60,7 @@ func TestRangeFeedIntegration(t *testing.T) { require.NoError(t, err) } - f, err := rangefeed.NewFactory(tc.Stopper(), db) + f, err := rangefeed.NewFactory(tc.Stopper(), db, nil) require.NoError(t, err) rows := make(chan *roachpb.RangeFeedValue) initialScanDone := make(chan struct{}) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index 12d764f86fb2..12a3d2838e51 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -12,11 +12,18 @@ package rangefeed_test import ( "context" + "strings" + "sync/atomic" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -25,6 +32,7 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" ) type mockClient struct { @@ -100,7 +108,7 @@ func TestRangeFeedMock(t *testing.T) { return nil }, } - f := rangefeed.NewFactoryWithDB(stopper, &mc) + f := rangefeed.NewFactoryWithDB(stopper, &mc, nil /* knobs */) require.NotNil(t, f) rows := make(chan *roachpb.RangeFeedValue) @@ -218,7 +226,7 @@ func TestRangeFeedMock(t *testing.T) { } }, } - f := rangefeed.NewFactoryWithDB(stopper, &mc) + f := rangefeed.NewFactoryWithDB(stopper, &mc, nil /* knobs */) rows := make(chan *roachpb.RangeFeedValue) r, err := f.RangeFeed(ctx, "foo", sp, initialTS, func( ctx context.Context, value *roachpb.RangeFeedValue, @@ -266,7 +274,7 @@ func TestRangeFeedMock(t *testing.T) { return ctx.Err() }, } - f := rangefeed.NewFactoryWithDB(stopper, &mc) + f := rangefeed.NewFactoryWithDB(stopper, &mc, nil /* knobs */) rows := make(chan *roachpb.RangeFeedValue) r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( ctx context.Context, value *roachpb.RangeFeedValue, @@ -285,7 +293,7 @@ func TestRangeFeedMock(t *testing.T) { EndKey: roachpb.Key("c"), } stopper.Stop(ctx) - f := rangefeed.NewFactoryWithDB(stopper, &mockClient{}) + f := rangefeed.NewFactoryWithDB(stopper, &mockClient{}, nil /* knobs */) r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( ctx context.Context, value *roachpb.RangeFeedValue, ) { @@ -326,3 +334,51 @@ func TestRangeFeedMock(t *testing.T) { r.Close() }) } + +// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a +// rangefeed fails. It observes this indirectly by looking at logs. +func TestBackoffOnRangefeedFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + + var called int64 + const timesToFail = 3 + rpcKnobs := rpc.ContextTestingKnobs{ + StreamClientInterceptor: func( + target string, class rpc.ConnectionClass, + ) grpc.StreamClientInterceptor { + return func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, + method string, streamer grpc.Streamer, opts ...grpc.CallOption, + ) (stream grpc.ClientStream, err error) { + if strings.Contains(method, "RangeFeed") && + atomic.AddInt64(&called, 1) <= timesToFail { + return nil, errors.Errorf("boom") + } + return streamer(ctx, desc, cc, method, opts...) + } + }, + } + ctx := context.Background() + var seen int64 + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ContextTestingKnobs: rpcKnobs, + }, + RangeFeed: &rangefeed.TestingKnobs{ + OnRangefeedRestart: func() { + atomic.AddInt64(&seen, 1) + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + testutils.SucceedsSoon(t, func() error { + if n := atomic.LoadInt64(&seen); n < timesToFail { + return errors.Errorf("seen %d, waiting for %d", n, timesToFail) + } + return nil + }) +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index b6e4de89af6d..5702539bf36f 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -64,6 +64,7 @@ go_library( "//pkg/kv/kvclient", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvtenant", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts/container", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/server/server.go b/pkg/server/server.go index 47521640cb23..e623882d6a6d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/container" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -425,6 +426,12 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } } + rangeFeedKnobs, _ := cfg.TestingKnobs.RangeFeed.(*rangefeed.TestingKnobs) + rangeFeedFactory, err := rangefeed.NewFactory(stopper, db, rangeFeedKnobs) + if err != nil { + return nil, err + } + nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ AmbientCtx: cfg.AmbientCtx, Clock: clock, @@ -657,6 +664,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { circularJobRegistry: jobRegistry, jobAdoptionStopFile: jobAdoptionStopFile, protectedtsProvider: protectedtsProvider, + rangeFeedFactory: rangeFeedFactory, sqlStatusServer: sStatus, }) if err != nil { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index b7f083b15d3f..df4a47364826 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/migration/migrationcluster" @@ -221,6 +222,9 @@ type sqlServerArgs struct { // Used to list sessions and cancel sessions/queries. sqlStatusServer serverpb.SQLStatusServer + + // Used to watch settings and descriptor changes. + rangeFeedFactory *rangefeed.Factory } func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { @@ -300,6 +304,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { codec, lmKnobs, cfg.stopper, + cfg.rangeFeedFactory, cfg.LeaseManagerConfig, ) cfg.registry.AddMetricStruct(leaseMgr.MetricsStruct()) @@ -523,6 +528,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { HydratedTables: hydratedTablesCache, GCJobNotifier: gcJobNotifier, ContentionRegistry: contention.NewRegistry(), + RangeFeedFactory: cfg.rangeFeedFactory, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 6a250d9b48df..fd2eb27b04f7 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" @@ -551,6 +552,11 @@ func makeSQLServerArgs( ds, ) db := kv.NewDB(baseCfg.AmbientCtx, tcsFactory, clock, stopper) + rangeFeedKnobs, _ := baseCfg.TestingKnobs.RangeFeed.(*rangefeed.TestingKnobs) + rangeFeedFactory, err := rangefeed.NewFactory(stopper, db, rangeFeedKnobs) + if err != nil { + return sqlServerArgs{}, err + } circularInternalExecutor := &sql.InternalExecutor{} // Protected timestamps won't be available (at first) in multi-tenant @@ -626,6 +632,7 @@ func makeSQLServerArgs( circularInternalExecutor: circularInternalExecutor, circularJobRegistry: &jobs.Registry{}, protectedtsProvider: protectedTSProvider, + rangeFeedFactory: rangeFeedFactory, sqlStatusServer: newTenantStatusServer( baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: circularInternalExecutor}, sessionRegistry, baseCfg.Settings, ), diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 5a54c4d79d3f..75929b8ba954 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -247,6 +247,7 @@ go_library( "//pkg/kv/kvclient", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/protectedts", @@ -503,6 +504,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index 97dc59139aac..f4003155ce7c 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -14,7 +14,7 @@ go_library( "//pkg/gossip", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient/kvcoord", + "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", "//pkg/security", "//pkg/settings", @@ -57,7 +57,6 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/roachpb", - "//pkg/rpc", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", @@ -81,7 +80,6 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/log/logpb", "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/stop", @@ -94,6 +92,5 @@ go_test( "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 78568969a043..1309f60cede3 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -1312,8 +1312,9 @@ func makeNameCacheKey(parentID descpb.ID, parentSchemaID descpb.ID, name string) // The locking order is: // Manager.mu > descriptorState.mu > nameCache.mu > descriptorVersionState.mu type Manager struct { - storage storage - mu struct { + rangeFeedFactory *rangefeed.Factory + storage storage + mu struct { syncutil.Mutex descriptors map[descpb.ID]*descriptorState @@ -1353,6 +1354,7 @@ func NewLeaseManager( codec keys.SQLCodec, testingKnobs ManagerTestingKnobs, stopper *stop.Stopper, + rangeFeedFactory *rangefeed.Factory, cfg *base.LeaseManagerConfig, ) *Manager { lm := &Manager{ @@ -1375,7 +1377,8 @@ func NewLeaseManager( Unit: metric.Unit_COUNT, }), }, - testingKnobs: testingKnobs, + rangeFeedFactory: rangeFeedFactory, + testingKnobs: testingKnobs, names: nameCache{ descriptors: make(map[nameCacheKey]*descriptorVersionState), }, @@ -1838,54 +1841,14 @@ func (m *Manager) watchForRangefeedUpdates( if log.V(1) { log.Infof(ctx, "using rangefeeds for lease manager updates") } - distSender := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender) - eventCh := make(chan *roachpb.RangeFeedEvent) - ctx, _ = s.WithCancelOnQuiesce(ctx) - if err := s.RunAsyncTask(ctx, "lease rangefeed", func(ctx context.Context) { - - // Run the rangefeed in a loop in the case of failure, likely due to node - // failures or general unavailability. We'll reset the retrier if the - // rangefeed runs for longer than the resetThreshold. - const resetThreshold = 30 * time.Second - restartLogEvery := log.Every(10 * time.Second) - for i, r := 1, retry.StartWithCtx(ctx, retry.Options{ - InitialBackoff: 100 * time.Millisecond, - MaxBackoff: 2 * time.Second, - Closer: s.ShouldQuiesce(), - }); r.Next(); i++ { - ts := m.getResolvedTimestamp() - descKeyPrefix := m.storage.codec.TablePrefix(uint32(systemschema.DescriptorTable.GetID())) - span := roachpb.Span{ - Key: descKeyPrefix, - EndKey: descKeyPrefix.PrefixEnd(), - } - // Note: We don't need to use withDiff to detect version changes because - // the Manager already stores the relevant version information. - const withDiff = false - log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, span) - start := timeutil.Now() - err := distSender.RangeFeed(ctx, span, ts, withDiff, eventCh) - if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() { - log.Warningf(ctx, "lease rangefeed failed %d times, restarting: %v", - log.Safe(i), log.Safe(err)) - } - if ctx.Err() != nil { - log.VEventf(ctx, 1, "exiting rangefeed") - return - } - ranFor := timeutil.Since(start) - log.VEventf(ctx, 1, "restarting rangefeed for %v after %v", - log.Safe(span), ranFor) - if ranFor > resetThreshold { - i = 1 - r.Reset() - } - } - }); err != nil { - // This will only fail if the stopper has been stopped. - return + descriptorTableStart := m.Codec().TablePrefix(keys.DescriptorTableID) + descriptorTableSpan := roachpb.Span{ + Key: descriptorTableStart, + EndKey: descriptorTableStart.PrefixEnd(), } - handleEvent := func(ev *roachpb.RangeFeedValue) { + handleEvent := func( + ctx context.Context, ev *roachpb.RangeFeedValue, + ) { if len(ev.Value.RawBytes) == 0 { return } @@ -1909,29 +1872,10 @@ func (m *Manager) watchForRangefeedUpdates( case descUpdateCh <- &descriptor: } } - _ = s.RunAsyncTask(ctx, "lease-rangefeed", func(ctx context.Context) { - for { - select { - case <-m.stopper.ShouldQuiesce(): - return - case <-ctx.Done(): - return - case e := <-eventCh: - if e.Checkpoint != nil { - log.VEventf(ctx, 2, "got rangefeed checkpoint %v", e.Checkpoint) - m.setResolvedTimestamp(e.Checkpoint.ResolvedTS) - continue - } - if e.Error != nil { - log.Warningf(ctx, "got an error from a rangefeed: %v", e.Error.Error) - continue - } - if e.Val != nil { - handleEvent(e.Val) - } - } - } - }) + // Ignore errors here because they indicate that the server is shutting down. + _, _ = m.rangeFeedFactory.RangeFeed( + ctx, "lease", descriptorTableSpan, m.getResolvedTimestamp(), handleEvent, + ) } func (m *Manager) handleUpdatedSystemCfg( diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index d4e150024733..c6f25cb98bc7 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -17,8 +17,6 @@ import ( "context" gosql "database/sql" "fmt" - "regexp" - "strings" "sync" "sync/atomic" "testing" @@ -29,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -52,7 +49,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -62,7 +58,6 @@ import ( "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) type leaseTest struct { @@ -230,6 +225,7 @@ func (t *leaseTest) node(nodeID uint32) *lease.Manager { cfgCpy.Codec, t.leaseManagerTestingKnobs, t.server.Stopper(), + cfgCpy.RangeFeedFactory, t.cfg, ) ctx := logtags.AddTag(context.Background(), "leasemgr", nodeID) @@ -2312,72 +2308,6 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) { require.Equal(t, gosql.ErrNoRows, db2.QueryRow("SELECT i, j FROM foo").Scan(&i, &j)) } -// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a -// rangefeed fails. It observes this indirectly by looking at logs. -func TestBackoffOnRangefeedFailure(t *testing.T) { - defer leaktest.AfterTest(t)() - - var called int64 - const timesToFail = 3 - rpcKnobs := rpc.ContextTestingKnobs{ - StreamClientInterceptor: func( - target string, class rpc.ConnectionClass, - ) grpc.StreamClientInterceptor { - return func( - ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, - method string, streamer grpc.Streamer, opts ...grpc.CallOption, - ) (stream grpc.ClientStream, err error) { - if strings.Contains(method, "RangeFeed") && - atomic.AddInt64(&called, 1) <= timesToFail { - return nil, errors.Errorf("boom") - } - return streamer(ctx, desc, cc, method, opts...) - } - }, - } - ctx := context.Background() - var seen struct { - syncutil.Mutex - entries []logpb.Entry - } - restartingRE := regexp.MustCompile("restarting rangefeed.*after.*") - log.Intercept(ctx, func(entry logpb.Entry) { - if !restartingRE.MatchString(entry.Message) { - return - } - seen.Lock() - defer seen.Unlock() - seen.entries = append(seen.entries, entry) - }) - defer log.Intercept(ctx, nil) - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - ContextTestingKnobs: rpcKnobs, - }, - }, - }, - }) - defer tc.Stopper().Stop(ctx) - testutils.SucceedsSoon(t, func() error { - seen.Lock() - defer seen.Unlock() - if len(seen.entries) < timesToFail { - return errors.Errorf("seen %d, waiting for %d", len(seen.entries), timesToFail) - } - return nil - }) - seen.Lock() - defer seen.Unlock() - minimumBackoff := 85 * time.Millisecond // initialBackoff less jitter - var totalBackoff time.Duration - for i := 1; i < len(seen.entries); i++ { - totalBackoff += time.Duration(seen.entries[i].Time - seen.entries[i-1].Time) - } - require.Greater(t, totalBackoff.Nanoseconds(), (3 * minimumBackoff).Nanoseconds()) -} - // TestLeaseWithOfflineTables checks that leases on tables which had // previously gone offline at some point are not gratuitously dropped. // See #57834. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 0dfb85233cfa..7a7e2f50c4e9 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/migration" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -796,6 +797,8 @@ type ExecutorConfig struct { GCJobNotifier *gcjobnotifier.Notifier + RangeFeedFactory *rangefeed.Factory + // VersionUpgradeHook is called after validating a `SET CLUSTER SETTING // version` but before executing it. It can carry out arbitrary migrations // that allow us to eventually remove legacy code. It will only be populated diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 686064a1b090..c75c668e9dd9 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -91,6 +92,8 @@ func TestSchemaChangeProcess(t *testing.T) { stopper := stop.NewStopper() cfg := base.NewLeaseManagerConfig() execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + rf, err := rangefeed.NewFactory(stopper, kvDB, nil /* knobs */) + require.NoError(t, err) leaseMgr := lease.NewLeaseManager( log.AmbientContext{Tracer: tracing.NewTracer()}, execCfg.NodeID, @@ -101,6 +104,7 @@ func TestSchemaChangeProcess(t *testing.T) { execCfg.Codec, lease.ManagerTestingKnobs{}, stopper, + rf, cfg, ) jobRegistry := s.JobRegistry().(*jobs.Registry) From 46c4ecaa8b83fbe14e8c100c76cc9f9a8faf378a Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 30 Dec 2020 00:48:34 -0500 Subject: [PATCH 4/8] server/settingswatcher: introduce a package to watch settings This commit only barely touches existing code by extracting the logic to decode rows from the settings table. Release note: None --- pkg/BUILD.bazel | 1 + pkg/ccl/serverccl/server_sql_test.go | 2 +- pkg/kv/kvclient/rangefeed/rangefeed.go | 5 +- pkg/server/BUILD.bazel | 4 +- pkg/server/settingswatcher/BUILD.bazel | 60 ++++++++ pkg/server/settingswatcher/main_test.go | 31 ++++ pkg/server/settingswatcher/row_decoder.go | 110 ++++++++++++++ .../row_decoder_external_test.go | 100 +++++++++++++ .../settingswatcher/settings_watcher.go | 136 ++++++++++++++++++ .../settings_watcher_external_test.go | 103 +++++++++++++ pkg/server/settingsworker.go | 69 +-------- 11 files changed, 551 insertions(+), 70 deletions(-) create mode 100644 pkg/server/settingswatcher/BUILD.bazel create mode 100644 pkg/server/settingswatcher/main_test.go create mode 100644 pkg/server/settingswatcher/row_decoder.go create mode 100644 pkg/server/settingswatcher/row_decoder_external_test.go create mode 100644 pkg/server/settingswatcher/settings_watcher.go create mode 100644 pkg/server/settingswatcher/settings_watcher_external_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 16984357c280..86f9905f191f 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -131,6 +131,7 @@ ALL_TESTS = [ "//pkg/server/goroutinedumper:goroutinedumper_test", "//pkg/server/heapprofiler:heapprofiler_test", "//pkg/server/serverpb:serverpb_test", + "//pkg/server/settingswatcher:settingswatcher_test", "//pkg/server/status:status_test", "//pkg/server/telemetry:telemetry_test", "//pkg/server:server_test", diff --git a/pkg/ccl/serverccl/server_sql_test.go b/pkg/ccl/serverccl/server_sql_test.go index 4825e0e25487..8276f065ecdb 100644 --- a/pkg/ccl/serverccl/server_sql_test.go +++ b/pkg/ccl/serverccl/server_sql_test.go @@ -101,7 +101,7 @@ func TestTenantUnauthenticatedAccess(t *testing.T) { }, }) require.Error(t, err) - require.Regexp(t, `Unauthenticated desc = requested key /Tenant/11/System/"system-version/" not fully contained in tenant keyspace /Tenant/1{0-1}`, err) + require.Regexp(t, `Unauthenticated desc = requested key .* not fully contained in tenant keyspace /Tenant/1{0-1}`, err) } // TestTenantHTTP verifies that SQL tenant servers expose metrics and debugging endpoints. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index a612892105fb..fbea8d55fc28 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -29,7 +29,8 @@ import ( // TODO(ajwerner): Expose hooks for metrics. // TODO(ajwerner): Expose access to checkpoints and the frontier. -// TODO(ajwerner): Expose better control over how the retrier gets reset. +// TODO(ajwerner): Expose better control over how the exponential backoff gets +// reset when the feed has been running successfully for a while. // kvDB is an adapter to the underlying KV store. type kvDB interface { @@ -44,7 +45,7 @@ type kvDB interface { eventC chan<- *roachpb.RangeFeedEvent, ) error - // Scan encapsulates scanning a keyspan at a given point in time. The method + // Scan encapsulates scanning a key span at a given point in time. The method // deals with pagination, calling the caller back for each row. Note that // the API does not require that the rows be ordered to allow for future // parallelism. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 5702539bf36f..564b6b8af76a 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -89,13 +89,13 @@ go_library( "//pkg/server/goroutinedumper", "//pkg/server/heapprofiler", "//pkg/server/serverpb", + "//pkg/server/settingswatcher", "//pkg/server/status", "//pkg/server/status/statuspb:statuspb_go_proto", "//pkg/server/telemetry", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql", - "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catconstants", "//pkg/sql/catalog/colinfo", @@ -116,7 +116,6 @@ go_library( "//pkg/sql/physicalplan", "//pkg/sql/querycache", "//pkg/sql/roleoption", - "//pkg/sql/rowenc", "//pkg/sql/schemachanger/scjob", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", @@ -138,7 +137,6 @@ go_library( "//pkg/ui", "//pkg/util", "//pkg/util/contextutil", - "//pkg/util/encoding", "//pkg/util/envutil", "//pkg/util/grpcutil", "//pkg/util/hlc", diff --git a/pkg/server/settingswatcher/BUILD.bazel b/pkg/server/settingswatcher/BUILD.bazel new file mode 100644 index 000000000000..556e8f83d5e6 --- /dev/null +++ b/pkg/server/settingswatcher/BUILD.bazel @@ -0,0 +1,60 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "settingswatcher", + srcs = [ + "row_decoder.go", + "settings_watcher.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/server/settingswatcher", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed", + "//pkg/roachpb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/row", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/encoding", + "//pkg/util/grpcutil", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/stop", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "settingswatcher_test", + srcs = [ + "main_test.go", + "row_decoder_external_test.go", + "settings_watcher_external_test.go", + ], + deps = [ + ":settingswatcher", + "//pkg/base", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/server/settingswatcher/main_test.go b/pkg/server/settingswatcher/main_test.go new file mode 100644 index 000000000000..25345ae44616 --- /dev/null +++ b/pkg/server/settingswatcher/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package settingswatcher_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/server/settingswatcher/row_decoder.go b/pkg/server/settingswatcher/row_decoder.go new file mode 100644 index 000000000000..0a80e238c062 --- /dev/null +++ b/pkg/server/settingswatcher/row_decoder.go @@ -0,0 +1,110 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package settingswatcher + +import ( + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/errors" +) + +// RowDecoder decodes rows from the settings table. +type RowDecoder struct { + codec keys.SQLCodec + alloc rowenc.DatumAlloc + colIdxMap catalog.TableColMap +} + +// MakeRowDecoder makes a new RowDecoder for the settings table. +func MakeRowDecoder(codec keys.SQLCodec) RowDecoder { + return RowDecoder{ + codec: codec, + colIdxMap: row.ColIDtoRowIndexFromCols( + systemschema.SettingsTable.TableDesc().Columns, + ), + } +} + +// DecodeRow decodes a row of the system.settings table. If the value is not +// present, the setting key will be returned but the other two fields will be +// zero and the tombstone bool will be set. +func (d *RowDecoder) DecodeRow( + kv roachpb.KeyValue, +) (setting, val, valType string, tombstone bool, _ error) { + tbl := systemschema.SettingsTable + // First we need to decode the setting name field from the index key. + { + types := []*types.T{tbl.PublicColumns()[0].GetType()} + nameRow := make([]rowenc.EncDatum, 1) + _, matches, _, err := rowenc.DecodeIndexKey(d.codec, tbl, tbl.GetPrimaryIndex().IndexDesc(), types, nameRow, nil, kv.Key) + if err != nil { + return "", "", "", false, errors.Wrap(err, "failed to decode key") + } + if !matches { + return "", "", "", false, errors.Errorf("unexpected non-settings KV with settings prefix: %v", kv.Key) + } + if err := nameRow[0].EnsureDecoded(types[0], &d.alloc); err != nil { + return "", "", "", false, err + } + setting = string(tree.MustBeDString(nameRow[0].Datum)) + } + if !kv.Value.IsPresent() { + return setting, "", "", true, nil + } + + // The rest of the columns are stored as a family, packed with diff-encoded + // column IDs followed by their values. + { + // column valueType can be null (missing) so we default it to "s". + valType = "s" + bytes, err := kv.Value.GetTuple() + if err != nil { + return "", "", "", false, err + } + var colIDDiff uint32 + var lastColID descpb.ColumnID + var res tree.Datum + for len(bytes) > 0 { + _, _, colIDDiff, _, err = encoding.DecodeValueTag(bytes) + if err != nil { + return "", "", "", false, err + } + colID := lastColID + descpb.ColumnID(colIDDiff) + lastColID = colID + if idx, ok := d.colIdxMap.Get(colID); ok { + res, bytes, err = rowenc.DecodeTableValue(&d.alloc, tbl.PublicColumns()[idx].GetType(), bytes) + if err != nil { + return "", "", "", false, err + } + switch colID { + case tbl.PublicColumns()[1].GetID(): // value + val = string(tree.MustBeDString(res)) + case tbl.PublicColumns()[3].GetID(): // valueType + valType = string(tree.MustBeDString(res)) + case tbl.PublicColumns()[2].GetID(): // lastUpdated + // TODO(dt): we could decode just the len and then seek `bytes` past + // it, without allocating/decoding the unused timestamp. + default: + return "", "", "", false, errors.Errorf("unknown column: %v", colID) + } + } + } + } + return setting, val, valType, false, nil +} diff --git a/pkg/server/settingswatcher/row_decoder_external_test.go b/pkg/server/settingswatcher/row_decoder_external_test.go new file mode 100644 index 000000000000..25d3438fcec7 --- /dev/null +++ b/pkg/server/settingswatcher/row_decoder_external_test.go @@ -0,0 +1,100 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package settingswatcher_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestRowDecoder simply verifies that the row decoder can safely decode the +// rows stored in the settings table of a real cluster with a few values of a +// few different types set. +func TestRowDecoder(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + toSet := map[string]struct { + val interface{} + expStr string + expValType string + }{ + "kv.rangefeed.enabled": { + val: true, + expStr: "true", + expValType: "b", + }, + "kv.queue.process.guaranteed_time_budget": { + val: "17s", + expStr: "17s", + expValType: "d", + }, + "kv.closed_timestamp.close_fraction": { + val: .23, + expStr: "0.23", + expValType: "f", + }, + "cluster.organization": { + val: "foobar", + expStr: "foobar", + expValType: "s", + }, + } + for k, v := range toSet { + tdb.Exec(t, "SET CLUSTER SETTING "+k+" = $1", v.val) + } + + k := keys.SystemSQLCodec.TablePrefix(keys.SettingsTableID) + rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + dec := settingswatcher.MakeRowDecoder(keys.SystemSQLCodec) + for _, row := range rows { + kv := roachpb.KeyValue{ + Key: row.Key, + Value: *row.Value, + } + + k, val, valType, tombstone, err := dec.DecodeRow(kv) + require.NoError(t, err) + require.False(t, tombstone) + if exp, ok := toSet[k]; ok { + require.Equal(t, exp.expStr, val) + require.Equal(t, exp.expValType, valType) + delete(toSet, k) + } + + // Test the tombstone logic while we're here. + { + kv.Value.Reset() + tombstoneK, val, valType, tombstone, err := dec.DecodeRow(kv) + require.NoError(t, err) + require.True(t, tombstone) + require.Equal(t, k, tombstoneK) + require.Zero(t, val) + require.Zero(t, valType) + } + } + require.Len(t, toSet, 0) +} diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go new file mode 100644 index 000000000000..a5ca60de0996 --- /dev/null +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -0,0 +1,136 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package settingswatcher provides utilities to update cluster settings using +// a range feed. +package settingswatcher + +import ( + "context" + "strings" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" +) + +// SettingsWatcher is used to watch for cluster settings changes with a +// rangefeed. +type SettingsWatcher struct { + clock *hlc.Clock + codec keys.SQLCodec + settings *cluster.Settings + f *rangefeed.Factory + stopper *stop.Stopper + dec RowDecoder +} + +// New constructs a new SettingsWatcher. +func New( + clock *hlc.Clock, + codec keys.SQLCodec, + settingsToUpdate *cluster.Settings, + f *rangefeed.Factory, + stopper *stop.Stopper, +) *SettingsWatcher { + return &SettingsWatcher{ + clock: clock, + codec: codec, + settings: settingsToUpdate, + f: f, + stopper: stopper, + dec: MakeRowDecoder(codec), + } +} + +// Start will start the SettingsWatcher. It returns after the initial settings +// have been retrieved. An error will be returned if the context is canceled or +// the stopper is stopped prior to the initial data being retrieved. +func (s *SettingsWatcher) Start(ctx context.Context) error { + settingsTablePrefix := s.codec.TablePrefix(keys.SettingsTableID) + settingsTableSpan := roachpb.Span{ + Key: settingsTablePrefix, + EndKey: settingsTablePrefix.PrefixEnd(), + } + now := s.clock.Now() + u := s.settings.MakeUpdater() + initialScanDone := make(chan struct{}) + var initialScanErr error + rf, err := s.f.RangeFeed(ctx, "settings", settingsTableSpan, now, func( + ctx context.Context, kv *roachpb.RangeFeedValue, + ) { + k, val, valType, tombstone, err := s.dec.DecodeRow(roachpb.KeyValue{ + Key: kv.Key, + Value: kv.Value, + }) + if err != nil { + log.Warningf(ctx, "failed to decode settings row %v: %v", kv.Key, err) + } + // This event corresponds to a deletion. + if tombstone { + s, ok := settings.Lookup(k, settings.LookupForLocalAccess) + if !ok { + log.Warningf(ctx, "failed to find setting %s, skipping update", + log.Safe(k)) + return + } + ws, ok := s.(settings.WritableSetting) + if !ok { + log.Fatalf(ctx, "expected writable setting, got %T", s) + } + val, valType = ws.EncodedDefault(), ws.Typ() + } + if err := u.Set(k, val, valType); err != nil { + log.Warningf(ctx, "failed to set setting %s to %s: %v", + log.Safe(k), val, err) + } + }, rangefeed.WithInitialScan(func(ctx context.Context) { + u.ResetRemaining() + close(initialScanDone) + }), rangefeed.WithOnInitialScanError(func( + ctx context.Context, err error, + ) (shouldFail bool) { + // TODO(ajwerner): Consider if there are other errors which we want to + // treat as permanent. + if grpcutil.IsAuthenticationError(err) || + // This is a hack around the fact that we do not get properly structured + // errors out of gRPC. See #56208. + strings.Contains(err.Error(), "rpc error: code = Unauthenticated") { + initialScanErr = err + close(initialScanDone) + shouldFail = true + } + return shouldFail + })) + if err != nil { + return err + } + select { + case <-initialScanDone: + if initialScanErr != nil { + return initialScanErr + } + s.stopper.AddCloser(rf) + return nil + case <-s.stopper.ShouldQuiesce(): + return errors.Wrap(stop.ErrUnavailable, + "failed to retrieve initial cluster settings") + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), + "failed to retrieve initial cluster settings") + } +} diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go new file mode 100644 index 000000000000..65a903bf1503 --- /dev/null +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -0,0 +1,103 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package settingswatcher_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestSettingsWatcher constructs a SettingsWatcher under a hypothetical tenant +// and then copies some values over to that tenant. It then ensures that the +// initial settings are picked up and that changes are also eventually picked +// up. +func TestSettingWatcher(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Interleaved tables are overridden to be on in testservers even though + // that is not the default value. + tdb.Exec(t, "SET CLUSTER SETTING sql.defaults.interleaved_tables.enabled = false") + + toSet := map[string][]interface{}{ + "sql.defaults.experimental_hash_sharded_indexes.enabled": {true, false}, + "kv.queue.process.guaranteed_time_budget": {"17s", "20s"}, + "kv.closed_timestamp.close_fraction": {.23, .55}, + "cluster.organization": {"foobar", "bazbax"}, + } + fakeTenant := roachpb.MakeTenantID(2) + systemTable := keys.SystemSQLCodec.TablePrefix(keys.SettingsTableID) + fakeCodec := keys.MakeSQLCodec(fakeTenant) + fakeTenantPrefix := keys.MakeTenantPrefix(fakeTenant) + + db := tc.Server(0).DB() + + copySettingsFromSystemToFakeTenant := func() { + rows, err := db.Scan(ctx, systemTable, systemTable.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + for _, row := range rows { + rem, _, err := keys.DecodeTenantPrefix(row.Key) + require.NoError(t, err) + tenantKey := append(fakeTenantPrefix, rem...) + row.Value.ClearChecksum() + row.Value.Timestamp = hlc.Timestamp{} + require.NoError(t, db.Put(ctx, tenantKey, row.Value)) + } + } + checkSettingsValuesMatch := func(a, b *cluster.Settings) error { + for _, k := range settings.Keys() { + s, ok := settings.Lookup(k, settings.LookupForLocalAccess) + require.True(t, ok) + if av, bv := s.String(&a.SV), s.String(&b.SV); av != bv { + return errors.Errorf("values do not match for %s: %s != %s", k, av, bv) + } + } + return nil + } + for k, v := range toSet { + tdb.Exec(t, "SET CLUSTER SETTING "+k+" = $1", v[0]) + } + copySettingsFromSystemToFakeTenant() + s0 := tc.Server(0) + fakeSettings := cluster.MakeTestingClusterSettings() + sw := settingswatcher.New(s0.Clock(), fakeCodec, fakeSettings, + s0.ExecutorConfig().(sql.ExecutorConfig).RangeFeedFactory, + tc.Stopper()) + require.NoError(t, sw.Start(ctx)) + require.NoError(t, checkSettingsValuesMatch(s0.ClusterSettings(), fakeSettings)) + for k, v := range toSet { + tdb.Exec(t, "SET CLUSTER SETTING "+k+" = $1", v[1]) + } + copySettingsFromSystemToFakeTenant() + testutils.SucceedsSoon(t, func() error { + return checkSettingsValuesMatch(s0.ClusterSettings(), fakeSettings) + }) +} diff --git a/pkg/server/settingsworker.go b/pkg/server/settingsworker.go index 63a62c0bdab4..2bbaa593b0d4 100644 --- a/pkg/server/settingsworker.go +++ b/pkg/server/settingsworker.go @@ -16,15 +16,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" - "github.com/cockroachdb/cockroach/pkg/sql/rowenc" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -34,72 +29,18 @@ func processSystemConfigKVs( ) error { tbl := systemschema.SettingsTable - a := &rowenc.DatumAlloc{} codec := keys.TODOSQLCodec settingsTablePrefix := codec.TablePrefix(uint32(tbl.GetID())) - colIdxMap := catalog.ColumnIDToOrdinalMap(tbl.PublicColumns()) + dec := settingswatcher.MakeRowDecoder(codec) var settingsKVs []roachpb.KeyValue processKV := func(ctx context.Context, kv roachpb.KeyValue, u settings.Updater) error { if !bytes.HasPrefix(kv.Key, settingsTablePrefix) { return nil } - - var k, v, t string - // First we need to decode the setting name field from the index key. - { - types := []*types.T{tbl.PublicColumns()[0].GetType()} - nameRow := make([]rowenc.EncDatum, 1) - _, matches, _, err := rowenc.DecodeIndexKey(codec, tbl, tbl.GetPrimaryIndex().IndexDesc(), types, nameRow, nil, kv.Key) - if err != nil { - return errors.Wrap(err, "failed to decode key") - } - if !matches { - return errors.Errorf("unexpected non-settings KV with settings prefix: %v", kv.Key) - } - if err := nameRow[0].EnsureDecoded(types[0], a); err != nil { - return err - } - k = string(tree.MustBeDString(nameRow[0].Datum)) - } - - // The rest of the columns are stored as a family, packed with diff-encoded - // column IDs followed by their values. - { - // column valueType can be null (missing) so we default it to "s". - t = "s" - bytes, err := kv.Value.GetTuple() - if err != nil { - return err - } - var colIDDiff uint32 - var lastColID descpb.ColumnID - var res tree.Datum - for len(bytes) > 0 { - _, _, colIDDiff, _, err = encoding.DecodeValueTag(bytes) - if err != nil { - return err - } - colID := lastColID + descpb.ColumnID(colIDDiff) - lastColID = colID - if idx, ok := colIdxMap.Get(colID); ok { - res, bytes, err = rowenc.DecodeTableValue(a, tbl.PublicColumns()[idx].GetType(), bytes) - if err != nil { - return err - } - switch colID { - case tbl.PublicColumns()[1].GetID(): // value - v = string(tree.MustBeDString(res)) - case tbl.PublicColumns()[3].GetID(): // valueType - t = string(tree.MustBeDString(res)) - case tbl.PublicColumns()[2].GetID(): // lastUpdated - // TODO(dt): we could decode just the len and then seek `bytes` past - // it, without allocating/decoding the unused timestamp. - default: - return errors.Errorf("unknown column: %v", colID) - } - } - } + k, v, t, _, err := dec.DecodeRow(kv) + if err != nil { + return err } settingsKVs = append(settingsKVs, kv) From fdbcdcf43c36ddfb0832750378933e211307655f Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 30 Dec 2020 00:50:44 -0500 Subject: [PATCH 5/8] server: hook up settingswatcher to sql server Release note: None --- pkg/server/server_sql.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index df4a47364826..046d5d28236c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/diagnostics" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydratedtables" @@ -110,6 +111,10 @@ type SQLServer struct { metricsRegistry *metric.Registry diagnosticsReporter *diagnostics.Reporter + // settingsWatcher is utilized by secondary tenants to watch for settings + // changes. It is nil on the system tenant. + settingsWatcher *settingswatcher.SettingsWatcher + // pgL is the shared RPC/SQL listener, opened when RPC was initialized. pgL net.Listener // connManager is the connection manager to use to set up additional @@ -679,6 +684,13 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { reporter.TestingKnobs = &cfg.TestingKnobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs } + var settingsWatcher *settingswatcher.SettingsWatcher + if !codec.ForSystemTenant() { + settingsWatcher = settingswatcher.New( + cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, + ) + } + return &SQLServer{ stopper: cfg.stopper, sqlIDContainer: cfg.nodeIDContainer, @@ -699,6 +711,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { sqlLivenessProvider: cfg.sqlLivenessProvider, metricsRegistry: cfg.registry, diagnosticsReporter: reporter, + settingsWatcher: settingsWatcher, }, nil } @@ -810,6 +823,12 @@ func (s *SQLServer) preStart( bootstrapVersion = clusterversion.ByKey(clusterversion.Start20_2) } + if s.settingsWatcher != nil { + if err := s.settingsWatcher.Start(ctx); err != nil { + return errors.Wrap(err, "initializing settings") + } + } + // Run startup migrations (note: these depend on jobs subsystem running). if err := sqlmigrationsMgr.EnsureMigrations(ctx, bootstrapVersion); err != nil { return errors.Wrap(err, "ensuring SQL migrations") From 77e41a3d078a9f90f95aa03785ac55a12192199e Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 15 Dec 2020 00:09:30 +0000 Subject: [PATCH 6/8] sql: remove limitations on tenants setting cluster settings This change also removes the limits in the SET CLUSTER SETTING code path that prevented tenants writing to their settings table before those changes would have had any effect. Release note: none. --- pkg/ccl/serverccl/BUILD.bazel | 2 -- pkg/ccl/serverccl/server_sql_test.go | 7 +++-- pkg/sql/exec_util.go | 6 ---- pkg/sql/set_cluster_setting.go | 45 +++++----------------------- 4 files changed, 12 insertions(+), 48 deletions(-) diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index d2c23790a03e..baaa36649b37 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -27,7 +27,6 @@ go_test( "//pkg/server", "//pkg/server/serverpb", "//pkg/sql", - "//pkg/sql/pgwire/pgcode", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", @@ -37,7 +36,6 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/timeutil", - "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//require", "@org_golang_x_crypto//bcrypt", ], diff --git a/pkg/ccl/serverccl/server_sql_test.go b/pkg/ccl/serverccl/server_sql_test.go index 8276f065ecdb..9d8b18ae6fe5 100644 --- a/pkg/ccl/serverccl/server_sql_test.go +++ b/pkg/ccl/serverccl/server_sql_test.go @@ -10,7 +10,6 @@ package serverccl import ( "context" - "errors" "io/ioutil" "net/http" "testing" @@ -20,13 +19,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/lib/pq" "github.com/stretchr/testify/require" ) @@ -77,10 +74,14 @@ func TestTenantCannotSetClusterSetting(t *testing.T) { _, db := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10), AllowSettingClusterSettings: false}) defer db.Close() _, err := db.Exec(`SET CLUSTER SETTING sql.defaults.vectorize=off`) + require.NoError(t, err) + /* TODO(dt): re-introduce when system-settings are prevented in tenants. + _, err = db.Exec(`SET CLUSTER SETTING kv.snapshot_rebalance.max_rate = '2MiB';`) var pqErr *pq.Error ok := errors.As(err, &pqErr) require.True(t, ok, "expected err to be a *pq.Error but is of type %T. error is: %v", err) require.Equal(t, pq.ErrorCode(pgcode.InsufficientPrivilege.String()), pqErr.Code, "err %v has unexpected code", err) + */ } func TestTenantUnauthenticatedAccess(t *testing.T) { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 7a7e2f50c4e9..27488f06eda2 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -965,12 +965,6 @@ var _ base.ModuleTestingKnobs = &TenantTestingKnobs{} // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (*TenantTestingKnobs) ModuleTestingKnobs() {} -// CanSetClusterSettings is a helper method that returns whether the tenant can -// set in-memory cluster settings. -func (k *TenantTestingKnobs) CanSetClusterSettings() bool { - return k != nil && k.ClusterSettingsUpdater != nil -} - // BackupRestoreTestingKnobs contains knobs for backup and restore behavior. type BackupRestoreTestingKnobs struct { // AllowImplicitAccess allows implicit access to data sources for non-admin diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index 9022229cbfd4..df9c712b1ad6 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -77,12 +77,6 @@ func checkPrivilegesForSetting(ctx context.Context, p *planner, name string, act func (p *planner) SetClusterSetting( ctx context.Context, n *tree.SetClusterSetting, ) (planNode, error) { - if !p.execCfg.TenantTestingKnobs.CanSetClusterSettings() && !p.execCfg.Codec.ForSystemTenant() { - // Setting cluster settings is disabled for phase 2 tenants if a test does - // not explicitly allow for setting in-memory cluster settings. - return nil, pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can SET CLUSTER SETTING") - } - name := strings.ToLower(n.Name) st := p.EvalContext().Settings v, ok := settings.Lookup(name, settings.LookupForLocalAccess) @@ -99,12 +93,6 @@ func (p *planner) SetClusterSetting( return nil, errors.AssertionFailedf("expected writable setting, got %T", v) } - if _, ok := setting.(*settings.VersionSetting); ok && p.execCfg.TenantTestingKnobs.CanSetClusterSettings() { - // A tenant that is allowed to set in-memory cluster settings is - // attempting to set the cluster version setting, which is disallowed. - return nil, pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can set version settings") - } - var value tree.TypedExpr if n.Value != nil { // For DEFAULT, let the value reference be nil. That's a RESET in disguise. @@ -174,31 +162,6 @@ func (n *setClusterSettingNode) startExec(params runParams) error { if !params.p.ExtendedEvalContext().TxnImplicit { return errors.Errorf("SET CLUSTER SETTING cannot be used inside a transaction") } - - if !params.p.execCfg.Codec.ForSystemTenant() { - // Sanity check that this tenant is able to set in-memory settings. - if !params.p.execCfg.TenantTestingKnobs.CanSetClusterSettings() { - return errors.Errorf("tenants cannot set cluster settings, this permission should have been checked at plan time") - } - var encodedValue string - if n.value == nil { - encodedValue = n.setting.EncodedDefault() - } else { - value, err := n.value.Eval(params.p.EvalContext()) - if err != nil { - return err - } - if _, ok := n.setting.(*settings.VersionSetting); ok { - return errors.Errorf("tenants cannot change cluster version setting, this should've been checked at plan time") - } - encodedValue, err = toSettingString(params.ctx, n.st, n.name, n.setting, value, nil /* prev */) - if err != nil { - return err - } - } - return params.p.execCfg.TenantTestingKnobs.ClusterSettingsUpdater.Set(n.name, encodedValue, n.setting.Typ()) - } - execCfg := params.extendedEvalCtx.ExecCfg var expectedEncodedValue string if err := execCfg.DB.Txn(params.ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -272,6 +235,14 @@ func (n *setClusterSettingNode) startExec(params runParams) error { ); err != nil { return err } + + if params.p.execCfg.TenantTestingKnobs != nil { + if err := params.p.execCfg.TenantTestingKnobs.ClusterSettingsUpdater.Set( + n.name, encoded, n.setting.Typ(), + ); err != nil { + return err + } + } } // Report tracked cluster settings via telemetry. From be2ed809d8458a69f14606e472eca945b3fe035b Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 15 Dec 2020 20:56:17 +0000 Subject: [PATCH 7/8] settings: add SystemOnly flag This change adds a flag to indicate a setting is only applicable to the system tenant, such as those that control storage layer behaviors, and thus should not be set by guest tenants (where it would be ignored). Release note: none. --- pkg/ccl/serverccl/BUILD.bazel | 2 ++ pkg/ccl/serverccl/server_sql_test.go | 5 +++-- pkg/kv/kvserver/store_snapshot.go | 4 ++-- pkg/settings/bool.go | 9 +++++++++ pkg/settings/byte_size.go | 6 ++++++ pkg/settings/duration.go | 9 +++++++++ pkg/settings/enum.go | 9 +++++++++ pkg/settings/float.go | 9 +++++++++ pkg/settings/int.go | 9 +++++++++ pkg/settings/masked.go | 5 +++++ pkg/settings/setting.go | 8 ++++++++ pkg/sql/set_cluster_setting.go | 5 +++++ 12 files changed, 76 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index baaa36649b37..d2c23790a03e 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -27,6 +27,7 @@ go_test( "//pkg/server", "//pkg/server/serverpb", "//pkg/sql", + "//pkg/sql/pgwire/pgcode", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", @@ -36,6 +37,7 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/timeutil", + "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//require", "@org_golang_x_crypto//bcrypt", ], diff --git a/pkg/ccl/serverccl/server_sql_test.go b/pkg/ccl/serverccl/server_sql_test.go index 9d8b18ae6fe5..f66df08503ac 100644 --- a/pkg/ccl/serverccl/server_sql_test.go +++ b/pkg/ccl/serverccl/server_sql_test.go @@ -10,6 +10,7 @@ package serverccl import ( "context" + "errors" "io/ioutil" "net/http" "testing" @@ -19,11 +20,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/lib/pq" "github.com/stretchr/testify/require" ) @@ -75,13 +78,11 @@ func TestTenantCannotSetClusterSetting(t *testing.T) { defer db.Close() _, err := db.Exec(`SET CLUSTER SETTING sql.defaults.vectorize=off`) require.NoError(t, err) - /* TODO(dt): re-introduce when system-settings are prevented in tenants. _, err = db.Exec(`SET CLUSTER SETTING kv.snapshot_rebalance.max_rate = '2MiB';`) var pqErr *pq.Error ok := errors.As(err, &pqErr) require.True(t, ok, "expected err to be a *pq.Error but is of type %T. error is: %v", err) require.Equal(t, pq.ErrorCode(pgcode.InsufficientPrivilege.String()), pqErr.Code, "err %v has unexpected code", err) - */ } func TestTenantUnauthenticatedAccess(t *testing.T) { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e649c45d2b17..5e8d4f427df1 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -857,7 +857,7 @@ var rebalanceSnapshotRate = settings.RegisterByteSizeSetting( "the rate limit (bytes/sec) to use for rebalance and upreplication snapshots", envutil.EnvOrDefaultBytes("COCKROACH_PREEMPTIVE_SNAPSHOT_RATE", 8<<20), validatePositive, -).WithPublic() +).WithPublic().WithSystemOnly() // recoverySnapshotRate is the rate at which Raft-initiated spanshots can be // sent. Ideally, one would never see a Raft-initiated snapshot; we'd like all @@ -870,7 +870,7 @@ var recoverySnapshotRate = settings.RegisterByteSizeSetting( "the rate limit (bytes/sec) to use for recovery snapshots", envutil.EnvOrDefaultBytes("COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20), validatePositive, -).WithPublic() +).WithPublic().WithSystemOnly() // snapshotSenderBatchSize is the size that key-value batches are allowed to // grow to during Range snapshots before being sent to the receiver. This limit diff --git a/pkg/settings/bool.go b/pkg/settings/bool.go index ce2af73756b9..31597303e203 100644 --- a/pkg/settings/bool.go +++ b/pkg/settings/bool.go @@ -82,6 +82,15 @@ func (b *BoolSetting) WithPublic() *BoolSetting { return b } +// WithSystemOnly marks this setting as system-only and can be chained. +func (b *BoolSetting) WithSystemOnly() *BoolSetting { + b.common.systemOnly = true + return b +} + +// Defeat the linter. +var _ = (*BoolSetting).WithSystemOnly + // RegisterBoolSetting defines a new setting with type bool. func RegisterBoolSetting(key, desc string, defaultValue bool) *BoolSetting { setting := &BoolSetting{defaultValue: defaultValue} diff --git a/pkg/settings/byte_size.go b/pkg/settings/byte_size.go index 210afcb4ff83..1c88056b7ce3 100644 --- a/pkg/settings/byte_size.go +++ b/pkg/settings/byte_size.go @@ -39,6 +39,12 @@ func (b *ByteSizeSetting) WithPublic() *ByteSizeSetting { return b } +// WithSystemOnly marks this setting as system-only and can be chained. +func (b *ByteSizeSetting) WithSystemOnly() *ByteSizeSetting { + b.common.systemOnly = true + return b +} + // RegisterByteSizeSetting defines a new setting with type bytesize and any // supplied validation function(s). func RegisterByteSizeSetting( diff --git a/pkg/settings/duration.go b/pkg/settings/duration.go index 38545e86bd4a..3fd1d972331a 100644 --- a/pkg/settings/duration.go +++ b/pkg/settings/duration.go @@ -111,6 +111,15 @@ func (d *DurationSetting) WithPublic() *DurationSetting { return d } +// WithSystemOnly marks this setting as system-only and can be chained. +func (d *DurationSetting) WithSystemOnly() *DurationSetting { + d.common.systemOnly = true + return d +} + +// Defeat the linter. +var _ = (*DurationSetting).WithSystemOnly + // RegisterDurationSetting defines a new setting with type duration. func RegisterDurationSetting( key, desc string, defaultValue time.Duration, validateFns ...func(time.Duration) error, diff --git a/pkg/settings/enum.go b/pkg/settings/enum.go index 69d0a0d843e9..c63993accd23 100644 --- a/pkg/settings/enum.go +++ b/pkg/settings/enum.go @@ -109,6 +109,15 @@ func (e *EnumSetting) WithPublic() *EnumSetting { return e } +// WithSystemOnly indicates system-usage only and can be chained. +func (e *EnumSetting) WithSystemOnly() *EnumSetting { + e.common.systemOnly = true + return e +} + +// Defeat the linter. +var _ = (*EnumSetting).WithSystemOnly + // RegisterEnumSetting defines a new setting with type int. func RegisterEnumSetting( key, desc string, defaultValue string, enumValues map[int64]string, diff --git a/pkg/settings/float.go b/pkg/settings/float.go index 90e39481bb93..b623c3026096 100644 --- a/pkg/settings/float.go +++ b/pkg/settings/float.go @@ -94,6 +94,15 @@ func (f *FloatSetting) setToDefault(sv *Values) { } } +// WithSystemOnly indicates system-usage only and can be chained. +func (f *FloatSetting) WithSystemOnly() *FloatSetting { + f.common.systemOnly = true + return f +} + +// Defeat the linter. +var _ = (*FloatSetting).WithSystemOnly + // RegisterFloatSetting defines a new setting with type float. func RegisterFloatSetting( key, desc string, defaultValue float64, validateFns ...func(float64) error, diff --git a/pkg/settings/int.go b/pkg/settings/int.go index 1486e3d645df..b6918f1cce59 100644 --- a/pkg/settings/int.go +++ b/pkg/settings/int.go @@ -123,6 +123,15 @@ func (i *IntSetting) WithPublic() *IntSetting { return i } +// WithSystemOnly system-only usage and can be chained. +func (i *IntSetting) WithSystemOnly() *IntSetting { + i.common.systemOnly = true + return i +} + +// Defeat the linter. +var _ = (*IntSetting).WithSystemOnly + // PositiveInt can be passed to RegisterIntSetting func PositiveInt(v int64) error { if v < 1 { diff --git a/pkg/settings/masked.go b/pkg/settings/masked.go index b67639ca74c1..cc4ebba774a8 100644 --- a/pkg/settings/masked.go +++ b/pkg/settings/masked.go @@ -47,3 +47,8 @@ func (s *MaskedSetting) Description() string { func (s *MaskedSetting) Typ() string { return s.setting.Typ() } + +// SystemOnly returns the underlying setting's SystemOnly. +func (s *MaskedSetting) SystemOnly() bool { + return s.setting.SystemOnly() +} diff --git a/pkg/settings/setting.go b/pkg/settings/setting.go index 4e7ea499021e..3bc3a5140127 100644 --- a/pkg/settings/setting.go +++ b/pkg/settings/setting.go @@ -201,6 +201,9 @@ type Setting interface { // Reserved settings are still accessible to users, but they don't get // listed out when retrieving all settings. Visibility() Visibility + + // SystemOnly indicates if a setting is only applicable to the system tenant. + SystemOnly() bool } // WritableSetting is the exported interface of non-masked settings. @@ -263,6 +266,7 @@ const ( type common struct { description string visibility Visibility + systemOnly bool // Each setting has a slotIdx which is used as a handle with Values. slotIdx int nonReportable bool @@ -298,6 +302,10 @@ func (i common) Visibility() Visibility { return i.visibility } +func (i common) SystemOnly() bool { + return i.systemOnly +} + func (i common) isReportable() bool { return !i.nonReportable } diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index df9c712b1ad6..15f4ad795e40 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -93,6 +93,11 @@ func (p *planner) SetClusterSetting( return nil, errors.AssertionFailedf("expected writable setting, got %T", v) } + if setting.SystemOnly() && !p.execCfg.Codec.ForSystemTenant() { + return nil, pgerror.Newf(pgcode.InsufficientPrivilege, + "setting %s is only settable in the system tenant", name) + } + var value tree.TypedExpr if n.Value != nil { // For DEFAULT, let the value reference be nil. That's a RESET in disguise. From 7573b7f93bf48d733e669a55fa3f69a033862178 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 15 Dec 2020 21:00:33 +0000 Subject: [PATCH 8/8] cli/gen,Makefile: generate list of tenant-ok settings This adds a generated listing of the settings available to tenants, i.e. settings which are not system-only. This generated listing is checked in so that reviewers will notice if a new setting is there that should not be presented to tenants. Release note: none. --- Makefile | 15 +-- build/variables.mk | 2 +- .../settings/settings-for-tenants.txt | 101 ++++++++++++++++++ pkg/cli/gen.go | 14 ++- 4 files changed, 124 insertions(+), 8 deletions(-) create mode 100644 docs/generated/settings/settings-for-tenants.txt diff --git a/Makefile b/Makefile index eba6192af7c4..486402d64f01 100644 --- a/Makefile +++ b/Makefile @@ -971,7 +971,13 @@ $(go-targets): override LINKFLAGS += \ $(COCKROACH) $(COCKROACHOSS) go-install: override LINKFLAGS += \ -X "github.com/cockroachdb/cockroach/pkg/build.utcTime=$(shell date -u '+%Y/%m/%d %H:%M:%S')" -SETTINGS_DOC_PAGE := docs/generated/settings/settings.html +docs/generated/settings/settings.html: $(settings-doc-gen) + @$(settings-doc-gen) gen settings-list --format=html > $@ + +docs/generated/settings/settings-for-tenants.txt: $(settings-doc-gen) + @$(settings-doc-gen) gen settings-list --without-system-only > $@ + +SETTINGS_DOC_PAGES := docs/generated/settings/settings.html docs/generated/settings/settings-for-tenants.txt # Note: We pass `-v` to `go build` and `go test -i` so that warnings # from the linker aren't suppressed. The usage of `-v` also shows when @@ -1002,7 +1008,7 @@ build: $(COCKROACH) buildoss: $(COCKROACHOSS) buildshort: $(COCKROACHSHORT) build buildoss buildshort: $(if $(is-cross-compile),,$(DOCGEN_TARGETS)) -build buildshort: $(if $(is-cross-compile),,$(SETTINGS_DOC_PAGE)) +build buildshort: $(if $(is-cross-compile),,$(SETTINGS_DOC_PAGES)) # For historical reasons, symlink cockroach to cockroachshort. # TODO(benesch): see if it would break anyone's workflow to remove this. @@ -1139,7 +1145,7 @@ dupl: bin/.bootstrap .PHONY: generate generate: ## Regenerate generated code. -generate: protobuf $(DOCGEN_TARGETS) $(OPTGEN_TARGETS) $(LOG_TARGETS) $(SQLPARSER_TARGETS) $(WKTPARSER_TARGETS) $(SETTINGS_DOC_PAGE) bin/langgen bin/terraformgen +generate: protobuf $(DOCGEN_TARGETS) $(OPTGEN_TARGETS) $(LOG_TARGETS) $(SQLPARSER_TARGETS) $(WKTPARSER_TARGETS) $(SETTINGS_DOC_PAGES) bin/langgen bin/terraformgen $(GO) generate $(GOFLAGS) $(GOMODVENDORFLAGS) -tags '$(TAGS)' -ldflags '$(LINKFLAGS)' $(PKG) $(MAKE) execgen @@ -1593,9 +1599,6 @@ pkg/util/log/log_channels_generated.go: pkg/util/log/gen.go pkg/util/log/logpb/l settings-doc-gen := $(if $(filter buildshort,$(MAKECMDGOALS)),$(COCKROACHSHORT),$(COCKROACH)) -$(SETTINGS_DOC_PAGE): $(settings-doc-gen) - @$(settings-doc-gen) gen settings-list --format=html > $@ - .PHONY: execgen execgen: ## Regenerate generated code for the vectorized execution engine. execgen: $(EXECGEN_TARGETS) bin/execgen diff --git a/build/variables.mk b/build/variables.mk index 4132d7c81c12..1f4ea56aca9b 100644 --- a/build/variables.mk +++ b/build/variables.mk @@ -120,7 +120,7 @@ define VALID_VARS PROTOC_DIR PROTO_MAPPINGS RACETIMEOUT - SETTINGS_DOC_PAGE + SETTINGS_DOC_PAGES SHELL SQLPARSER_TARGETS STARTFLAGS diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt new file mode 100644 index 000000000000..fcf3ef8b0800 --- /dev/null +++ b/docs/generated/settings/settings-for-tenants.txt @@ -0,0 +1,101 @@ +Setting Type Default Description +cloudstorage.gs.default.key string if set, JSON key to use during Google Cloud Storage operations +cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage +cloudstorage.timeout duration 10m0s the timeout for import/export storage operations +cluster.organization string organization name +cluster.preserve_downgrade_option string disable (automatic or manual) cluster version upgrade from the specified version until reset +diagnostics.forced_sql_stat_reset.interval duration 2h0m0s interval after which SQL statement statistics are refreshed even if not collected (should be more than diagnostics.sql_stat_reset.interval). It has a max value of 24H. +diagnostics.reporting.enabled boolean true enable reporting diagnostic metrics to cockroach labs +diagnostics.reporting.interval duration 1h0m0s interval at which diagnostics data should be reported +diagnostics.sql_stat_reset.interval duration 1h0m0s interval controlling how often SQL statement statistics should be reset (should be less than diagnostics.forced_sql_stat_reset.interval). It has a max value of 24H. +enterprise.license string the encoded cluster license +external.graphite.endpoint string if nonempty, push server metrics to the Graphite or Carbon server at the specified host:port +external.graphite.interval duration 10s the interval at which metrics are pushed to Graphite (if enabled) +feature.backup.enabled boolean true set to true to enable backups, false to disable; default is true +feature.changefeed.enabled boolean true set to true to enable changefeeds, false to disable; default is true +feature.export.enabled boolean true set to true to enable exports, false to disable; default is true +feature.import.enabled boolean true set to true to enable imports, false to disable; default is true +feature.restore.enabled boolean true set to true to enable restore, false to disable; default is true +feature.schema_change.enabled boolean true set to true to enable schema changes, false to disable; default is true +feature.stats.enabled boolean true set to true to enable CREATE STATISTICS/ANALYZE, false to disable; default is true +jobs.retention_time duration 336h0m0s the amount of time to retain records for completed jobs before +kv.allocator.load_based_lease_rebalancing.enabled boolean true set to enable rebalancing of range leases based on load and latency +kv.allocator.load_based_rebalancing enumeration leases and replicas whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2] +kv.allocator.qps_rebalance_threshold float 0.25 minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull +kv.allocator.range_rebalance_threshold float 0.05 minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull +kv.bulk_io_write.max_rate byte size 1.0 TiB the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops +kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to serve consistent historical reads based on closed timestamp information +kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records +kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated +kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting +kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled +kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) +kv.transaction.max_intents_bytes integer 262144 maximum number of bytes used to track locks in transactions +kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions +security.ocsp.mode enumeration off "use OCSP to check whether TLS certificates are revoked. If the OCSP +server is unreachable, in strict mode all certificates will be rejected +and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]" +security.ocsp.timeout duration 3s timeout before considering the OCSP server unreachable +server.auth_log.sql_connections.enabled boolean false if set, log SQL client connect and disconnect events (note: may hinder performance on loaded nodes) +server.auth_log.sql_sessions.enabled boolean false if set, log SQL session login/disconnection events (note: may hinder performance on loaded nodes) +server.clock.forward_jump_check_enabled boolean false if enabled, forward clock jumps > max_offset/2 will cause a panic +server.clock.persist_upper_bound_interval duration 0s the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature. +server.consistency_check.max_rate byte size 8.0 MiB the rate limit (bytes/sec) to use for consistency checks; used in conjunction with server.consistency_check.interval to control the frequency of consistency checks. Note that setting this too high can negatively impact performance. +server.eventlog.enabled boolean true if set, logged notable events are also stored in the table system.eventlog +server.eventlog.ttl duration 2160h0m0s if nonzero, entries in system.eventlog older than this duration are deleted every 10m0s. Should not be lowered below 24 hours. +server.host_based_authentication.configuration string host-based authentication configuration to use during connection authentication +server.oidc_authentication.autologin boolean false if true, logged-out visitors to the DB Console will be automatically redirected to the OIDC login endpoint (this feature is experimental) +server.oidc_authentication.button_text string Login with your OIDC provider text to show on button on DB Console login page to login with your OIDC provider (only shown if OIDC is enabled) (this feature is experimental) +server.oidc_authentication.claim_json_key string sets JSON key of principal to extract from payload after OIDC authentication completes (usually email or sid) (this feature is experimental) +server.oidc_authentication.client_id string sets OIDC client id (this feature is experimental) +server.oidc_authentication.client_secret string sets OIDC client secret (this feature is experimental) +server.oidc_authentication.enabled boolean false enables or disabled OIDC login for the DB Console (this feature is experimental) +server.oidc_authentication.principal_regex string (.+) regular expression to apply to extracted principal (see claim_json_key setting) to translate to SQL user (golang regex format, must include 1 grouping to extract) (this feature is experimental) +server.oidc_authentication.provider_url string sets OIDC provider URL ({provider_url}/.well-known/openid-configuration must resolve) (this feature is experimental) +server.oidc_authentication.redirect_url string https://localhost:8080/oidc/v1/callback sets OIDC redirect URL (base HTTP URL, likely your load balancer, must route to the path /oidc/v1/callback) (this feature is experimental) +server.oidc_authentication.scopes string openid sets OIDC scopes to include with authentication request (space delimited list of strings, required to start with `openid`) (this feature is experimental) +server.rangelog.ttl duration 720h0m0s if nonzero, range log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours. +server.remote_debugging.mode string local set to enable remote debugging, localhost-only or disable (any, local, off) +server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with the rest of the shutdown process +server.shutdown.lease_transfer_wait duration 5s the amount of time a server waits to transfer range leases before proceeding with the rest of the shutdown process +server.shutdown.query_wait duration 10s the server will wait for at least this amount of time for active queries to finish +server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead +server.user_login.timeout duration 10s timeout after which client authentication times out if some system range is unavailable (0 = no timeout) +server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid +sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed +sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed +sql.cross_db_views.enabled boolean false if true, creating views that refer to other databases is allowed +sql.defaults.default_int_size integer 8 the size, in bytes, of an INT type +sql.defaults.disallow_full_table_scans.enabled boolean false setting to true rejects queries that have planned a full table scan +sql.defaults.results_buffer.size byte size 16 KiB default size of the buffer that accumulates results for a statement or a batch of statements before they are sent to the client. This can be overridden on an individual connection with the 'results_buffer_size' parameter. Note that auto-retries generally only happen while no results have been delivered to the client, so reducing this size can increase the number of retriable errors a client receives. On the other hand, increasing the buffer size can increase the delay until the client receives the first result row. Updating the setting only affects new connections. Setting to 0 disables any buffering. +sql.defaults.serial_normalization enumeration rowid default handling of SERIAL in table definitions [rowid = 0, virtual_sequence = 1, sql_sequence = 2] +sql.distsql.max_running_flows integer 500 maximum number of concurrent flows that can be run on a node +sql.log.slow_query.experimental_full_table_scans.enabled boolean false when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect. +sql.log.slow_query.internal_queries.enabled boolean false when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect. +sql.log.slow_query.latency_threshold duration 0s when set to non-zero, log statements whose service latency exceeds the threshold to a secondary logger on each node +sql.metrics.statement_details.dump_to_logs boolean false dump collected statement statistics to node logs when periodically cleared +sql.metrics.statement_details.enabled boolean true collect per-statement query statistics +sql.metrics.statement_details.plan_collection.enabled boolean true periodically save a logical plan for each fingerprint +sql.metrics.statement_details.plan_collection.period duration 5m0s the time until a new logical plan is collected +sql.metrics.statement_details.threshold duration 0s minimum execution time to cause statement statistics to be collected. If configured, no transaction stats are collected. +sql.metrics.transaction_details.enabled boolean true collect per-application transaction statistics +sql.notices.enabled boolean true enable notices in the server/client protocol being sent +sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators +sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode +sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh +sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh +sql.stats.histogram_collection.enabled boolean true histogram collection mode +sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode +sql.stats.post_events.enabled boolean false if set, an event is logged for every CREATE STATISTICS job +sql.temp_object_cleaner.cleanup_interval duration 30m0s how often to clean up orphaned temporary objects +sql.trace.log_statement_execute boolean false set to true to enable logging of executed statements +sql.trace.session_eventlog.enabled boolean false set to true to enable session tracing. Note that enabling this may have a non-trivial negative performance impact. +sql.trace.stmt.enable_threshold duration 0s duration beyond which all statements are traced (set to 0 to disable). This applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold. +sql.trace.txn.enable_threshold duration 0s duration beyond which all transactions are traced (set to 0 to disable). This setting is coarser grained thansql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries). +timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere +timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. +timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. +trace.debug.enable boolean false if set, traces for recent requests can be seen at https:///debug/requests +trace.lightstep.token string if set, traces go to Lightstep using this token +trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set +version version 20.2-16 set the active cluster version in the format '.' diff --git a/pkg/cli/gen.go b/pkg/cli/gen.go index ba3a93e57728..34eb31e9e226 100644 --- a/pkg/cli/gen.go +++ b/pkg/cli/gen.go @@ -180,8 +180,11 @@ The resulting key file will be 32 bytes (random key ID) + key_size in bytes. }, } +var includeReservedSettings bool +var excludeSystemSettings bool + var genSettingsListCmd = &cobra.Command{ - Use: "settings-list ", + Use: "settings-list", Short: "output a list of available cluster settings", Long: ` Output the list of cluster settings known to this binary. @@ -204,6 +207,11 @@ Output the list of cluster settings known to this binary. if !ok { panic(fmt.Sprintf("could not find setting %q", name)) } + + if excludeSystemSettings && setting.SystemOnly() { + continue + } + if setting.Visibility() != settings.Public { // We don't document non-public settings at this time. continue @@ -271,6 +279,10 @@ func init() { "AES key size for encryption at rest (one of: 128, 192, 256)") genEncryptionKeyCmd.PersistentFlags().BoolVar(&overwriteKey, "overwrite", false, "Overwrite key if it exists") + genSettingsListCmd.PersistentFlags().BoolVar(&includeReservedSettings, "include-reserved", false, + "include undocumented 'reserved' settings") + genSettingsListCmd.PersistentFlags().BoolVar(&excludeSystemSettings, "without-system-only", false, + "do not list settings only applicable to system tenant") genCmd.AddCommand(genCmds...) }