Skip to content

Commit

Permalink
Merge #75711
Browse files Browse the repository at this point in the history
75711: multitenant: listen for setting overrides r=RaduBerinde a=RaduBerinde

#### settings: add EncodedValue proto, update tenant settings API

This commit consolidates multiple uses of encoded setting values (raw
value and type strings) into a `settings.EncodedValue` proto.

The tenant settings roachpb API (not used yet) is updated to use this.

Release note: None

#### multitenant: listen for setting overrides

This implements the tenant side code for setting overrides.
Specifically, the tenant connector now implements the
`OverridesMonitor` interface using the `TenantSettings` API.

The server side of this API is not yet implemented, so this commit
does not include end-to-end tests. Basic functionality is verified
through a unit test that mocks the server-side API.

Informs #73857.

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Feb 9, 2022
2 parents 432a383 + af5a690 commit 7c4e234
Show file tree
Hide file tree
Showing 34 changed files with 770 additions and 261 deletions.
1 change: 1 addition & 0 deletions docs/generated/http/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ genrule(
"//pkg/server/diagnostics/diagnosticspb:diagnosticspb_proto",
"//pkg/server/serverpb:serverpb_proto",
"//pkg/server/status/statuspb:statuspb_proto",
"//pkg/settings:settings_proto",
"//pkg/sql/catalog/catpb:catpb_proto",
"//pkg/sql/catalog/descpb:descpb_proto",
"//pkg/sql/schemachanger/scpb:scpb_proto",
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kvtenantccl",
srcs = ["connector.go"],
srcs = [
"connector.go",
"setting_overrides.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -16,6 +19,7 @@ go_library(
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/server/serverpb",
"//pkg/settings",
"//pkg/spanconfig",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
Expand All @@ -38,6 +42,7 @@ go_test(
srcs = [
"connector_test.go",
"main_test.go",
"setting_overrides_test.go",
"tenant_kv_test.go",
"tenant_trace_test.go",
"tenant_upgrade_test.go",
Expand All @@ -60,6 +65,7 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/testutils",
Expand Down
73 changes: 54 additions & 19 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand Down Expand Up @@ -63,13 +64,13 @@ func init() {
type Connector struct {
log.AmbientContext

tenantID roachpb.TenantID
rpcContext *rpc.Context
rpcRetryOptions retry.Options
rpcDialTimeout time.Duration // for testing
rpcDial singleflight.Group
defaultZoneCfg *zonepb.ZoneConfig
addrs []string
startupC chan struct{}

mu struct {
syncutil.RWMutex
Expand All @@ -78,6 +79,15 @@ type Connector struct {
systemConfig *config.SystemConfig
systemConfigChannels []chan<- struct{}
}

settingsMu struct {
syncutil.Mutex

allTenantOverrides map[string]settings.EncodedValue
specificOverrides map[string]settings.EncodedValue
// notifyCh receives an event when there are changes to overrides.
notifyCh chan struct{}
}
}

// client represents an RPC client that proxies to a KV instance.
Expand Down Expand Up @@ -117,14 +127,22 @@ var _ spanconfig.KVAccessor = (*Connector)(nil)
// NOTE: Calling Start will set cfg.RPCContext.ClusterID.
func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
cfg.AmbientCtx.AddLogTag("tenant-connector", nil)
return &Connector{
if cfg.TenantID.IsSystem() {
panic("TenantID not set")
}
c := &Connector{
tenantID: cfg.TenantID,
AmbientContext: cfg.AmbientCtx,
rpcContext: cfg.RPCContext,
rpcRetryOptions: cfg.RPCRetryOptions,
defaultZoneCfg: cfg.DefaultZoneConfig,
addrs: addrs,
startupC: make(chan struct{}),
}

c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue)
return c
}

// connectorFactory implements kvtenant.ConnectorFactory.
Expand All @@ -140,26 +158,46 @@ func (connectorFactory) NewConnector(
// connect to a KV node. Start returns once the connector has determined the
// cluster's ID and set Connector.rpcContext.ClusterID.
func (c *Connector) Start(ctx context.Context) error {
startupC := c.startupC
gossipStartupCh := make(chan struct{})
settingsStartupCh := make(chan struct{})
bgCtx := c.AnnotateCtx(context.Background())
if err := c.rpcContext.Stopper.RunAsyncTask(bgCtx, "connector", func(ctx context.Context) {

if err := c.rpcContext.Stopper.RunAsyncTask(bgCtx, "connector-gossip", func(ctx context.Context) {
ctx = c.AnnotateCtx(ctx)
ctx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(ctx)
defer cancel()
c.runGossipSubscription(ctx)
c.runGossipSubscription(ctx, gossipStartupCh)
}); err != nil {
return err
}
// Synchronously block until the first GossipSubscription event.
select {
case <-startupC:
return nil
case <-ctx.Done():
return ctx.Err()
if err := c.rpcContext.Stopper.RunAsyncTask(bgCtx, "connector-settings", func(ctx context.Context) {
ctx = c.AnnotateCtx(ctx)
ctx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(ctx)
defer cancel()
c.runTenantSettingsSubscription(ctx, settingsStartupCh)
}); err != nil {
return err
}

// Block until we receive the first GossipSubscription event and the initial
// setting overrides.
for gossipStartupCh != nil || settingsStartupCh != nil {
select {
case <-gossipStartupCh:
gossipStartupCh = nil
case <-settingsStartupCh:
settingsStartupCh = nil
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

func (c *Connector) runGossipSubscription(ctx context.Context) {
// runGossipSubscription listens for gossip subscription events. It closes the
// given channel once the ClusterID gossip key has been handled.
// Exits when the context is done.
func (c *Connector) runGossipSubscription(ctx context.Context, startupCh chan struct{}) {
for ctx.Err() == nil {
client, err := c.getClient(ctx)
if err != nil {
Expand Down Expand Up @@ -198,9 +236,9 @@ func (c *Connector) runGossipSubscription(ctx context.Context) {

// Signal that startup is complete once the ClusterID gossip key has
// been handled.
if c.startupC != nil && e.PatternMatched == gossip.KeyClusterID {
close(c.startupC)
c.startupC = nil
if startupCh != nil && e.PatternMatched == gossip.KeyClusterID {
close(startupCh)
startupCh = nil
}
}
}
Expand Down Expand Up @@ -257,9 +295,6 @@ func (c *Connector) updateNodeAddress(ctx context.Context, key string, content r
// nothing ever removes them from Gossip.nodeDescs. Fix this.
c.mu.Lock()
defer c.mu.Unlock()
if c.mu.nodeDescs == nil {
c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
}
c.mu.nodeDescs[desc.NodeID] = desc
}

Expand Down
24 changes: 16 additions & 8 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ var rpcRetryOpts = retry.Options{
var _ roachpb.InternalServer = &mockServer{}

type mockServer struct {
rangeLookupFn func(context.Context, *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error)
gossipSubFn func(*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer) error
rangeLookupFn func(context.Context, *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error)
gossipSubFn func(*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer) error
tenantSettingsFn func(request *roachpb.TenantSettingsRequest, server roachpb.Internal_TenantSettingsServer) error
}

func (m *mockServer) RangeLookup(
Expand All @@ -62,6 +63,19 @@ func (m *mockServer) GossipSubscription(
return m.gossipSubFn(req, stream)
}

func (m *mockServer) TenantSettings(
req *roachpb.TenantSettingsRequest, stream roachpb.Internal_TenantSettingsServer,
) error {
if m.tenantSettingsFn == nil {
return stream.Send(&roachpb.TenantSettingsEvent{
Precedence: roachpb.SpecificTenantOverrides,
Incremental: false,
Overrides: nil,
})
}
return m.tenantSettingsFn(req, stream)
}

func (*mockServer) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest,
) (*roachpb.ResetQuorumResponse, error) {
Expand Down Expand Up @@ -112,12 +126,6 @@ func (m *mockServer) UpdateSystemSpanConfigs(
panic("unimplemented")
}

func (m *mockServer) TenantSettings(
*roachpb.TenantSettingsRequest, roachpb.Internal_TenantSettingsServer,
) error {
panic("unimplemented")
}

func gossipEventForClusterID(clusterID uuid.UUID) *roachpb.GossipSubscriptionEvent {
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeyClusterID,
Expand Down
151 changes: 151 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/setting_overrides.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package kvtenantccl

import (
"context"
"io"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/errorspb"
)

// runTenantSettingsSubscription listens for tenant setting override changes.
// It closes the given channel once the initial set of overrides were obtained.
// Exits when the context is done.
func (c *Connector) runTenantSettingsSubscription(ctx context.Context, startupCh chan struct{}) {
for ctx.Err() == nil {
client, err := c.getClient(ctx)
if err != nil {
continue
}
stream, err := client.TenantSettings(ctx, &roachpb.TenantSettingsRequest{
TenantID: c.tenantID,
})
if err != nil {
log.Warningf(ctx, "error issuing TenantSettings RPC: %v", err)
c.tryForgetClient(ctx, client)
continue
}
for firstEventInStream := true; ; firstEventInStream = false {
e, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
// Soft RPC error. Drop client and retry.
log.Warningf(ctx, "error consuming TenantSettings RPC: %v", err)
c.tryForgetClient(ctx, client)
break
}
if e.Error != (errorspb.EncodedError{}) {
// Hard logical error. We expect io.EOF next.
log.Errorf(ctx, "error consuming TenantSettings RPC: %v", e.Error)
continue
}

if err := c.processSettingsEvent(e, firstEventInStream); err != nil {
log.Errorf(ctx, "error processing tenant settings event: %v", err)
_ = stream.CloseSend()
c.tryForgetClient(ctx, client)
break
}

// Signal that startup is complete once we receive an event.
if startupCh != nil {
close(startupCh)
startupCh = nil
}
}
}
}

// processSettingsEvent updates the setting overrides based on the event.
func (c *Connector) processSettingsEvent(
e *roachpb.TenantSettingsEvent, firstEventInStream bool,
) error {
if firstEventInStream && e.Incremental {
return errors.Newf("first event must not be Incremental")
}
c.settingsMu.Lock()
defer c.settingsMu.Unlock()

var m map[string]settings.EncodedValue
switch e.Precedence {
case roachpb.AllTenantsOverrides:
m = c.settingsMu.allTenantOverrides
case roachpb.SpecificTenantOverrides:
m = c.settingsMu.specificOverrides
default:
return errors.Newf("unknown precedence value %d", e.Precedence)
}

// If the event is not incremental, clear the map.
if !e.Incremental {
for k := range m {
delete(m, k)
}
}
// Merge in the override changes.
for _, o := range e.Overrides {
if o.Value == (settings.EncodedValue{}) {
// Empty value indicates that the override is removed.
delete(m, o.Name)
} else {
m[o.Name] = o.Value
}
}

// Do a non-blocking send on the notification channel (if it is not nil). This
// is a buffered channel and if it already contains a message, there's no
// point in sending a duplicate notification.
select {
case c.settingsMu.notifyCh <- struct{}{}:
default:
}

return nil
}

// RegisterOverridesChannel is part of the settingswatcher.OverridesMonitor
// interface.
func (c *Connector) RegisterOverridesChannel() <-chan struct{} {
c.settingsMu.Lock()
defer c.settingsMu.Unlock()
if c.settingsMu.notifyCh != nil {
panic(errors.AssertionFailedf("multiple calls not supported"))
}
ch := make(chan struct{}, 1)
// Send an initial message on the channel.
ch <- struct{}{}
c.settingsMu.notifyCh = ch
return ch
}

// Overrides is part of the settingswatcher.OverridesMonitor interface.
func (c *Connector) Overrides() map[string]settings.EncodedValue {
// We could be more efficient here, but we expect this function to be called
// only when there are changes (which should be rare).
res := make(map[string]settings.EncodedValue)
c.settingsMu.Lock()
defer c.settingsMu.Unlock()
// First copy the all-tenant overrides.
for name, val := range c.settingsMu.allTenantOverrides {
res[name] = val
}
// Then copy the specific overrides (which can overwrite some all-tenant
// overrides).
for name, val := range c.settingsMu.specificOverrides {
res[name] = val
}
return res
}
Loading

0 comments on commit 7c4e234

Please sign in to comment.