Skip to content

Commit

Permalink
Merge pull request #105602 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-105566
  • Loading branch information
knz authored Jun 27, 2023
2 parents d87785a + 1ff03a6 commit d30bce5
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 130 deletions.
44 changes: 41 additions & 3 deletions pkg/kv/kvclient/kvtenant/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ type connector struct {
defaultZoneCfg *zonepb.ZoneConfig
addrs []string

startCh chan struct{} // closed when connector has started up
startErr error

mu struct {
syncutil.RWMutex
client *client
Expand All @@ -172,9 +175,17 @@ type connector struct {
settingsMu struct {
syncutil.Mutex

allTenantOverrides map[string]settings.EncodedValue
specificOverrides map[string]settings.EncodedValue
// notifyCh receives an event when there are changes to overrides.
// receivedFirstAllTenantOverrides is set to true when the first batch of
// all-tenant overrides has been received.
receivedFirstAllTenantOverrides bool
allTenantOverrides map[string]settings.EncodedValue

// receivedFirstSpecificOverrides is set to true when the first batch of
// tenant-specific overrides has been received.
receivedFirstSpecificOverrides bool
specificOverrides map[string]settings.EncodedValue

// notifyCh is closed when there are changes to overrides.
notifyCh chan struct{}
}
}
Expand Down Expand Up @@ -247,6 +258,7 @@ func NewConnector(cfg ConnectorConfig, addrs []string) Connector {
c.mu.systemConfigChannels = make(map[chan<- struct{}]struct{})
c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.notifyCh = make(chan struct{})
return c
}

Expand All @@ -268,10 +280,36 @@ func (connectorFactory) NewConnector(
return NewConnector(cfg, []string{addressConfig.LoopbackAddress}), nil
}

// WaitForStart waits until the connector has started.
func (c *connector) WaitForStart(ctx context.Context) error {
// Fast path check.
select {
case <-c.startCh:
return c.startErr
default:
}
if c.startCh == nil {
return errors.AssertionFailedf("Start() was not yet called")
}
select {
case <-c.startCh:
return c.startErr
case <-ctx.Done():
return ctx.Err()
}
}

// Start launches the connector's worker thread and waits for it to successfully
// connect to a KV node. Start returns once the connector has determined the
// cluster's ID and set connector.rpcContext.ClusterID.
func (c *connector) Start(ctx context.Context) error {
c.startCh = make(chan struct{})
c.startErr = c.internalStart(ctx)
close(c.startCh)
return c.startErr
}

func (c *connector) internalStart(ctx context.Context) error {
gossipStartupCh := make(chan struct{})
settingsStartupCh := make(chan struct{})
bgCtx := c.AnnotateCtx(context.Background())
Expand Down
16 changes: 14 additions & 2 deletions pkg/kv/kvclient/kvtenant/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,15 @@ func (m *mockServer) TenantSettings(
req *kvpb.TenantSettingsRequest, stream kvpb.Internal_TenantSettingsServer,
) error {
if m.tenantSettingsFn == nil {
if err := stream.Send(&kvpb.TenantSettingsEvent{
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: false,
Overrides: nil,
}); err != nil {
return err
}
return stream.Send(&kvpb.TenantSettingsEvent{
Precedence: kvpb.SpecificTenantOverrides,
Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES,
Incremental: false,
Overrides: nil,
})
Expand Down Expand Up @@ -263,7 +270,12 @@ func TestConnectorGossipSubscription(t *testing.T) {
gossipSubC <- gossipEventForNodeDesc(node1)
gossipSubC <- gossipEventForNodeDesc(node2)
gossipSubC <- gossipEventForClusterID(clusterID)
require.NoError(t, <-startedC)
select {
case err := <-startedC:
require.NoError(t, err)
case <-time.After(10 * time.Second):
t.Fatalf("failed to see start complete")
}

// Ensure that ClusterID was updated.
require.Equal(t, clusterID, rpcContext.StorageClusterID.Get())
Expand Down
77 changes: 35 additions & 42 deletions pkg/kv/kvclient/kvtenant/setting_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
c.tryForgetClient(ctx, client)
continue
}
for firstEventInStream := true; ; firstEventInStream = false {
for {
e, err := stream.Recv()
if err != nil {
if err == io.EOF {
Expand All @@ -55,40 +55,48 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
continue
}

if err := c.processSettingsEvent(e, firstEventInStream); err != nil {
settingsReady, err := c.processSettingsEvent(e)
if err != nil {
log.Errorf(ctx, "error processing tenant settings event: %v", err)
_ = stream.CloseSend()
c.tryForgetClient(ctx, client)
break
}

// Signal that startup is complete once we receive an event.
if startupCh != nil {
close(startupCh)
startupCh = nil
// Signal that startup is complete once we have enough events to start.
if settingsReady {
log.Infof(ctx, "received initial tenant settings")

if startupCh != nil {
close(startupCh)
startupCh = nil
}
}
}
}
}

// processSettingsEvent updates the setting overrides based on the event.
func (c *connector) processSettingsEvent(
e *kvpb.TenantSettingsEvent, firstEventInStream bool,
) error {
if firstEventInStream && e.Incremental {
return errors.Newf("first event must not be Incremental")
}
e *kvpb.TenantSettingsEvent,
) (settingsReady bool, err error) {
c.settingsMu.Lock()
defer c.settingsMu.Unlock()

if (!c.settingsMu.receivedFirstAllTenantOverrides || !c.settingsMu.receivedFirstSpecificOverrides) && e.Incremental {
return false, errors.Newf("need to receive non-incremental setting events first")
}

var m map[string]settings.EncodedValue
switch e.Precedence {
case kvpb.AllTenantsOverrides:
case kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES:
c.settingsMu.receivedFirstAllTenantOverrides = true
m = c.settingsMu.allTenantOverrides
case kvpb.SpecificTenantOverrides:
case kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES:
c.settingsMu.receivedFirstSpecificOverrides = true
m = c.settingsMu.specificOverrides
default:
return errors.Newf("unknown precedence value %d", e.Precedence)
return false, errors.Newf("unknown precedence value %d", e.Precedence)
}

// If the event is not incremental, clear the map.
Expand All @@ -107,39 +115,24 @@ func (c *connector) processSettingsEvent(
}
}

// Do a non-blocking send on the notification channel (if it is not nil). This
// is a buffered channel and if it already contains a message, there's no
// point in sending a duplicate notification.
select {
case c.settingsMu.notifyCh <- struct{}{}:
default:
}
// Notify watchers if any.
close(c.settingsMu.notifyCh)
// Define a new notification channel for subsequent watchers.
c.settingsMu.notifyCh = make(chan struct{})

return nil
}

// RegisterOverridesChannel is part of the settingswatcher.OverridesMonitor
// interface.
func (c *connector) RegisterOverridesChannel() <-chan struct{} {
c.settingsMu.Lock()
defer c.settingsMu.Unlock()
if c.settingsMu.notifyCh != nil {
panic(errors.AssertionFailedf("multiple calls not supported"))
}
ch := make(chan struct{}, 1)
// Send an initial message on the channel.
ch <- struct{}{}
c.settingsMu.notifyCh = ch
return ch
// The protocol defines that the server sends one initial
// non-incremental message for both precedences.
settingsReady = c.settingsMu.receivedFirstAllTenantOverrides && c.settingsMu.receivedFirstSpecificOverrides
return settingsReady, nil
}

// Overrides is part of the settingswatcher.OverridesMonitor interface.
func (c *connector) Overrides() map[string]settings.EncodedValue {
// We could be more efficient here, but we expect this function to be called
// only when there are changes (which should be rare).
res := make(map[string]settings.EncodedValue)
func (c *connector) Overrides() (map[string]settings.EncodedValue, <-chan struct{}) {
c.settingsMu.Lock()
defer c.settingsMu.Unlock()

res := make(map[string]settings.EncodedValue, len(c.settingsMu.allTenantOverrides)+len(c.settingsMu.specificOverrides))

// First copy the all-tenant overrides.
for name, val := range c.settingsMu.allTenantOverrides {
res[name] = val
Expand All @@ -149,5 +142,5 @@ func (c *connector) Overrides() map[string]settings.EncodedValue {
for name, val := range c.settingsMu.specificOverrides {
res[name] = val
}
return res
return res, c.settingsMu.notifyCh
}
58 changes: 37 additions & 21 deletions pkg/kv/kvclient/kvtenant/setting_overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestConnectorSettingOverrides(t *testing.T) {
gossipSubFn := func(req *kvpb.GossipSubscriptionRequest, stream kvpb.Internal_GossipSubscriptionServer) error {
return stream.Send(gossipEventForClusterID(rpcContext.StorageClusterID.Get()))
}
eventCh := make(chan *kvpb.TenantSettingsEvent)
eventCh := make(chan *kvpb.TenantSettingsEvent, 2)
defer close(eventCh)
settingsFn := func(req *kvpb.TenantSettingsRequest, stream kvpb.Internal_TenantSettingsServer) error {
if req.TenantID != tenantID {
Expand Down Expand Up @@ -87,20 +87,33 @@ func TestConnectorSettingOverrides(t *testing.T) {
case <-time.After(10 * time.Millisecond):
}

ch := c.RegisterOverridesChannel()
// We should always get an initial notification.
waitForSettings(t, ch)

ev := &kvpb.TenantSettingsEvent{
Precedence: 1,
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: false,
Overrides: nil,
}
eventCh <- ev
require.NoError(t, <-startedC)

waitForSettings(t, ch)
expectSettings(t, c, "foo=default bar=default baz=default")
select {
case err := <-startedC:
t.Fatalf("Start unexpectedly completed with err=%v", err)
case <-time.After(10 * time.Millisecond):
}

ev = &kvpb.TenantSettingsEvent{
Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES,
Incremental: false,
Overrides: nil,
}
eventCh <- ev
select {
case err := <-startedC:
require.NoError(t, err)
case <-time.After(10 * time.Second):
t.Fatalf("failed to see start complete")
}

ch := expectSettings(t, c, "foo=default bar=default baz=default")

st := func(name, val string) kvpb.TenantSetting {
return kvpb.TenantSetting{
Expand All @@ -111,53 +124,53 @@ func TestConnectorSettingOverrides(t *testing.T) {

// Set some all-tenant overrides.
ev = &kvpb.TenantSettingsEvent{
Precedence: kvpb.AllTenantsOverrides,
Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES,
Incremental: true,
Overrides: []kvpb.TenantSetting{st("foo", "all"), st("bar", "all")},
}
eventCh <- ev
waitForSettings(t, ch)
expectSettings(t, c, "foo=all bar=all baz=default")
ch = expectSettings(t, c, "foo=all bar=all baz=default")

// Set some tenant-specific overrides, with all-tenant overlap.
ev = &kvpb.TenantSettingsEvent{
Precedence: kvpb.SpecificTenantOverrides,
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: true,
Overrides: []kvpb.TenantSetting{st("foo", "specific"), st("baz", "specific")},
}
eventCh <- ev
waitForSettings(t, ch)
expectSettings(t, c, "foo=specific bar=all baz=specific")
ch = expectSettings(t, c, "foo=specific bar=all baz=specific")

// Remove an all-tenant override that has a specific override.
ev = &kvpb.TenantSettingsEvent{
Precedence: kvpb.AllTenantsOverrides,
Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES,
Incremental: true,
Overrides: []kvpb.TenantSetting{st("foo", "")},
}
eventCh <- ev
waitForSettings(t, ch)
expectSettings(t, c, "foo=specific bar=all baz=specific")
ch = expectSettings(t, c, "foo=specific bar=all baz=specific")

// Remove a specific override.
ev = &kvpb.TenantSettingsEvent{
Precedence: kvpb.SpecificTenantOverrides,
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: true,
Overrides: []kvpb.TenantSetting{st("foo", "")},
}
eventCh <- ev
waitForSettings(t, ch)
expectSettings(t, c, "foo=default bar=all baz=specific")
ch = expectSettings(t, c, "foo=default bar=all baz=specific")

// Non-incremental change to all-tenants override.
ev = &kvpb.TenantSettingsEvent{
Precedence: kvpb.AllTenantsOverrides,
Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES,
Incremental: true,
Overrides: []kvpb.TenantSetting{st("bar", "all")},
}
eventCh <- ev
waitForSettings(t, ch)
expectSettings(t, c, "foo=default bar=all baz=specific")
_ = expectSettings(t, c, "foo=default bar=all baz=specific")
}

func waitForSettings(t *testing.T, ch <-chan struct{}) {
Expand All @@ -169,14 +182,15 @@ func waitForSettings(t *testing.T, ch <-chan struct{}) {
t.Fatalf("waitForSettings timed out")
}
}
func expectSettings(t *testing.T, c *connector, exp string) {

func expectSettings(t *testing.T, c *connector, exp string) <-chan struct{} {
t.Helper()
vars := []string{"foo", "bar", "baz"}
values := make(map[string]string)
for i := range vars {
values[vars[i]] = "default"
}
overrides := c.Overrides()
overrides, updateCh := c.Overrides()
for _, v := range vars {
if val, ok := overrides[v]; ok {
values[v] = val.Value
Expand All @@ -190,4 +204,6 @@ func expectSettings(t *testing.T, c *connector, exp string) {
if str != exp {
t.Errorf("expected: %s got: %s", exp, str)
}

return updateCh
}
16 changes: 0 additions & 16 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1895,22 +1895,6 @@ func (s *ScanStats) String() string {
return redact.StringWithoutMarkers(s)
}

// TenantSettingsPrecedence identifies the precedence of a set of setting
// overrides. It is used by the TenantSettings API which supports passing
// multiple overrides for the same setting.
type TenantSettingsPrecedence uint32

const (
// SpecificTenantOverrides is the high precedence for tenant setting overrides.
// These overrides take precedence over AllTenantsOverrides.
SpecificTenantOverrides TenantSettingsPrecedence = 1 + iota

// AllTenantsOverrides is the low precedence for tenant setting overrides.
// These overrides are only effectual for a tenant if there is no override
// with the SpecificTenantOverrides precedence..
AllTenantsOverrides
)

// RangeFeedEventSink is an interface for sending a single rangefeed event.
type RangeFeedEventSink interface {
Context() context.Context
Expand Down
Loading

0 comments on commit d30bce5

Please sign in to comment.