Skip to content

Commit

Permalink
server/systemconfigwatcher: use rangefeeds to watch the system config
Browse files Browse the repository at this point in the history
This also adopts the rangefeed backed cache for the optimizer and for
the reporter.

Fixes cockroachdb#70558
Fixes cockroachdb#74665

Release note: None
  • Loading branch information
ajwerner committed Jan 13, 2022
1 parent 5eee2d1 commit 484b405
Show file tree
Hide file tree
Showing 17 changed files with 440 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ ALL_TESTS = [
"//pkg/server/serverpb:serverpb_test",
"//pkg/server/settingswatcher:settingswatcher_test",
"//pkg/server/status:status_test",
"//pkg/server/systemconfigwatcher:systemconfigwatcher_test",
"//pkg/server/telemetry:telemetry_test",
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# LogicTest: multiregion-9node-3region-3azs
# TODO(#69265): enable multiregion-9node-3region-3azs-tenant.

# Set the closed timestamp interval to be short to shorten the amount of time
# we need to wait for the system config to propagate.
statement ok
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';

statement ok
CREATE DATABASE multi_region_test_db PRIMARY REGION "ca-central-1" REGIONS "ap-southeast-2", "us-east-1";
USE multi_region_test_db
Expand Down Expand Up @@ -187,7 +193,7 @@ ALTER TABLE history INJECT STATISTICS '[

# Regression test for #63735. Ensure that we choose locality optimized anti
# joins for the foreign key checks.
query T
query T retry
EXPLAIN INSERT
INTO
history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_amount, h_date, h_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# TODO(#69265): enable multiregion-9node-3region-3azs-tenant and/or revert
# the commit that split these changes out.

# Set the closed timestamp interval to be short to shorten the amount of time
# we need to wait for the system config to propagate.
statement ok
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';

statement ok
CREATE DATABASE multi_region_test_db PRIMARY REGION "ca-central-1" REGIONS "ap-southeast-2", "us-east-1" SURVIVE REGION FAILURE;
USE multi_region_test_db
Expand All @@ -20,7 +26,7 @@ CREATE TABLE regional_by_row_table (
) LOCALITY REGIONAL BY ROW

# Do a REGEXP replace of the enums as these may not be static.
query T
query T retry
SELECT regexp_replace(info, '@\d+', '@<enum_val>', 'g') FROM
[EXPLAIN (OPT, CATALOG) SELECT * FROM regional_by_row_table]
----
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/server/systemconfigwatcher/systemconfigwatchertest",
"//pkg/sql",
"//pkg/sql/distsql",
"//pkg/sql/tests",
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/licenseccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -250,3 +251,8 @@ func TestNoInflightTracesVirtualTableOnTenant(t *testing.T) {
require.Error(t, err, "cluster_inflight_traces should be unsupported")
require.Contains(t, err.Error(), "table crdb_internal.cluster_inflight_traces is not implemented on tenants")
}

func TestSystemConfigWatcherCache(t *testing.T) {
defer leaktest.AfterTest(t)()
systemconfigwatchertest.TestSystemConfigWatcher(t, false /* skipSecondary */)
}
14 changes: 14 additions & 0 deletions pkg/ccl/telemetryccl/testdata/telemetry/multiregion
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ IMPORT INTO t7 CSV DATA ('nodelocal://0/t7/export*.csv')
sql.multiregion.import

# Test for locality optimized search counter.

# Lower the closed timestamp subsystem so system config info is transmitted
# rapidly.
exec
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '5ms';
----

feature-allowlist
sql.plan.opt.locality-optimized-search
----
Expand All @@ -393,6 +401,12 @@ USE survive_region;
CREATE TABLE t8 (a INT PRIMARY KEY) LOCALITY REGIONAL BY ROW
----

# Sleep a large multiple of the closed timestamp target duration to ensure
# that a fresh system config has made its way to the optimizer.
exec
SELECT pg_sleep(.05);
----

feature-usage
SELECT * FROM t8 WHERE a = 1
----
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/reports/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Reporter struct {
settings *cluster.Settings
storePool *kvserver.StorePool
executor sqlutil.InternalExecutor
cfgs config.SystemConfigProvider

frequencyMu struct {
syncutil.Mutex
Expand All @@ -89,6 +90,7 @@ func NewReporter(
st *cluster.Settings,
liveness *liveness.NodeLiveness,
executor sqlutil.InternalExecutor,
provider config.SystemConfigProvider,
) *Reporter {
r := Reporter{
db: db,
Expand All @@ -97,6 +99,7 @@ func NewReporter(
settings: st,
liveness: liveness,
executor: executor,
cfgs: provider,
}
r.frequencyMu.changeCh = make(chan struct{})
return &r
Expand Down Expand Up @@ -279,7 +282,7 @@ func (stats *Reporter) meta1LeaseHolderStore(ctx context.Context) *kvserver.Stor
}

func (stats *Reporter) updateLatestConfig() {
stats.latestConfig = stats.meta1LeaseHolder.Gossip().GetSystemConfig()
stats.latestConfig = stats.cfgs.GetSystemConfig()
}

// nodeChecker checks whether a node is to be considered alive or not.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ go_library(
"//pkg/server/settingswatcher",
"//pkg/server/status",
"//pkg/server/status/statuspb",
"//pkg/server/systemconfigwatcher",
"//pkg/server/telemetry",
"//pkg/server/tracedumper",
"//pkg/settings",
Expand Down
10 changes: 8 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -714,8 +715,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)
kvserver.RegisterPerStoreServer(grpcServer.Server, node.perReplicaServer)
ctpb.RegisterSideTransportServer(grpcServer.Server, ctReceiver)

systemConfigWatcher := systemconfigwatcher.New(
keys.SystemSQLCodec, clock, rangeFeedFactory, &cfg.DefaultZoneConfig,
)
replicationReporter := reports.NewReporter(
db, node.stores, storePool, st, nodeLiveness, internalExecutor)
db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher,
)

protectedtsReconciler := ptreconcile.NewReconciler(ptreconcile.Config{
Settings: st,
Expand Down Expand Up @@ -803,7 +809,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
runtime: runtimeSampler,
rpcContext: rpcContext,
nodeDescs: g,
systemConfigProvider: g,
systemConfigWatcher: systemConfigWatcher,
spanConfigAccessor: spanConfig.kvAccessor,
nodeDialer: nodeDialer,
distSender: distSender,
Expand Down
14 changes: 10 additions & 4 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/blobs/blobspb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/featureflag"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -48,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
Expand Down Expand Up @@ -148,6 +148,8 @@ type SQLServer struct {
spanconfigSQLWatcher *spanconfigsqlwatcher.SQLWatcher
settingsWatcher *settingswatcher.SettingsWatcher

systemConfigWatcher *systemconfigwatcher.Cache

// pgL is the shared RPC/SQL listener, opened when RPC was initialized.
pgL net.Listener
// connManager is the connection manager to use to set up additional
Expand Down Expand Up @@ -228,7 +230,7 @@ type sqlServerArgs struct {
nodeDescs kvcoord.NodeDescStore

// Used by the executor config.
systemConfigProvider config.SystemConfigProvider
systemConfigWatcher *systemconfigwatcher.Cache

// Used by the span config reconciliation job.
spanConfigAccessor spanconfig.KVAccessor
Expand Down Expand Up @@ -498,7 +500,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
hydratedTablesCache := hydratedtables.NewCache(cfg.Settings)
cfg.registry.AddMetricStruct(hydratedTablesCache.Metrics())

gcJobNotifier := gcjobnotifier.New(cfg.Settings, cfg.systemConfigProvider, codec, cfg.stopper)
gcJobNotifier := gcjobnotifier.New(cfg.Settings, cfg.systemConfigWatcher, codec, cfg.stopper)

var compactEngineSpanFunc tree.CompactEngineSpanFunc
if !codec.ForSystemTenant() {
Expand Down Expand Up @@ -641,7 +643,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
DB: cfg.db,
Gossip: cfg.gossip,
NodeLiveness: cfg.nodeLiveness,
SystemConfig: cfg.systemConfigProvider,
SystemConfig: cfg.systemConfigWatcher,
MetricsRecorder: cfg.recorder,
DistSender: cfg.distSender,
RPCContext: cfg.rpcContext,
Expand Down Expand Up @@ -970,6 +972,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
spanconfigSQLTranslator: spanConfig.sqlTranslator,
spanconfigSQLWatcher: spanConfig.sqlWatcher,
settingsWatcher: settingsWatcher,
systemConfigWatcher: cfg.systemConfigWatcher,
}, nil
}

Expand Down Expand Up @@ -1149,6 +1152,9 @@ func (s *SQLServer) preStart(
if err := s.settingsWatcher.Start(ctx); err != nil {
return errors.Wrap(err, "initializing settings")
}
if err := s.systemConfigWatcher.Start(ctx, s.stopper); err != nil {
return errors.Wrap(err, "initializing settings")
}

// Run startup migrations (note: these depend on jobs subsystem running).
if err := startupMigrationsMgr.EnsureMigrations(ctx, bootstrapVersion); err != nil {
Expand Down
38 changes: 38 additions & 0 deletions pkg/server/systemconfigwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "systemconfigwatcher",
srcs = ["cache.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher",
visibility = ["//visibility:public"],
deps = [
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/roachpb:with-mocks",
"//pkg/util/hlc",
"//pkg/util/stop",
"//pkg/util/syncutil",
],
)

go_test(
name = "systemconfigwatcher_test",
srcs = [
"cache_test.go",
"main_test.go",
],
embed = [":systemconfigwatcher"],
deps = [
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/systemconfigwatcher/systemconfigwatchertest",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
],
)
Loading

0 comments on commit 484b405

Please sign in to comment.