Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: notify tenant servers of metadata changes #105441

Merged
merged 5 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvtenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ go_library(
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvpb",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
Expand Down Expand Up @@ -73,6 +76,8 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/load",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security",
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvclient/kvtenant/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand Down Expand Up @@ -190,6 +192,27 @@ type connector struct {
// notifyCh is closed when there are changes to overrides.
notifyCh chan struct{}
}

// testingEmulateOldVersionSettingsClient is set to true when the
// connector should emulate the version where it processed all
// events as settings events. Used only for testing.
testingEmulateOldVersionSettingsClient bool

metadataMu struct {
syncutil.Mutex

// receivedFirstMetadata is set to true when the first batch of
// metadata bits has been received.
receivedFirstMetadata bool

tenantName roachpb.TenantName
dataState mtinfopb.TenantDataState
serviceMode mtinfopb.TenantServiceMode
capabilities *tenantcapabilitiespb.TenantCapabilities

// notifyCh is closed when there are changes to the metadata.
notifyCh chan struct{}
}
}

// client represents an RPC client that proxies to a KV instance.
Expand Down Expand Up @@ -263,6 +286,7 @@ func NewConnector(cfg ConnectorConfig, addrs []string) Connector {
c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.notifyCh = make(chan struct{})
c.metadataMu.notifyCh = make(chan struct{})
return c
}

Expand Down
38 changes: 36 additions & 2 deletions pkg/kv/kvclient/kvtenant/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -52,6 +53,8 @@ type mockServer struct {
rangeLookupFn func(context.Context, *kvpb.RangeLookupRequest) (*kvpb.RangeLookupResponse, error)
gossipSubFn func(*kvpb.GossipSubscriptionRequest, kvpb.Internal_GossipSubscriptionServer) error
tenantSettingsFn func(request *kvpb.TenantSettingsRequest, server kvpb.Internal_TenantSettingsServer) error

emulateOldVersionSettingServer bool
}

func (m *mockServer) RangeLookup(
Expand All @@ -70,18 +73,45 @@ func (m *mockServer) TenantSettings(
req *kvpb.TenantSettingsRequest, stream kvpb.Internal_TenantSettingsServer,
) error {
if m.tenantSettingsFn == nil {
// First message - required by startup protocol.
if err := stream.Send(&kvpb.TenantSettingsEvent{
EventType: kvpb.TenantSettingsEvent_SETTING_EVENT,
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: false,
Overrides: nil,
}); err != nil {
return err
}
return stream.Send(&kvpb.TenantSettingsEvent{
if !m.emulateOldVersionSettingServer {
// Initial tenant metadata.
if err := stream.Send(&kvpb.TenantSettingsEvent{
EventType: kvpb.TenantSettingsEvent_METADATA_EVENT,
Name: "foo",
// TODO(knz): remove cast after the dep cycle has been resolved.
DataState: uint32(mtinfopb.DataStateReady),
ServiceMode: uint32(mtinfopb.ServiceModeExternal),

// Need to ensure this looks like a fake no-op setting override event.
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: true,
}); err != nil {
return err
}
}
// Finish startup.
if err := stream.Send(&kvpb.TenantSettingsEvent{
EventType: kvpb.TenantSettingsEvent_SETTING_EVENT,
Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES,
Incremental: false,
Overrides: nil,
})
}); err != nil {
return err
}

// Ensure the stream doesn't immediately finish, which can cause
// flakes in tests due to the retry loop in the client.
<-stream.Context().Done()
return nil
}
return m.tenantSettingsFn(req, stream)
}
Expand Down Expand Up @@ -212,6 +242,7 @@ func newConnector(cfg ConnectorConfig, addrs []string) *connector {
// kvcoord.NodeDescStore and as a config.SystemConfigProvider.
func TestConnectorGossipSubscription(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
Expand Down Expand Up @@ -370,6 +401,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
// kvcoord.RangeDescriptorDB.
func TestConnectorRangeLookup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
Expand Down Expand Up @@ -455,6 +487,7 @@ func TestConnectorRangeLookup(t *testing.T) {
// on one of them.
func TestConnectorRetriesUnreachable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
Expand Down Expand Up @@ -542,6 +575,7 @@ func TestConnectorRetriesUnreachable(t *testing.T) {
// immediately if it is not.
func TestConnectorRetriesError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
Expand Down
105 changes: 91 additions & 14 deletions pkg/kv/kvclient/kvtenant/setting_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -48,6 +50,11 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
c.settingsMu.receivedFirstAllTenantOverrides = false
c.settingsMu.receivedFirstSpecificOverrides = false
}()
func() {
c.metadataMu.Lock()
defer c.metadataMu.Unlock()
c.metadataMu.receivedFirstMetadata = false
}()

for {
e, err := stream.Recv()
Expand Down Expand Up @@ -87,31 +94,99 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
continue
}

settingsReady, err := c.processSettingsEvent(e)
if err != nil {
log.Errorf(ctx, "error processing tenant settings event: %v", err)
_ = stream.CloseSend()
c.tryForgetClient(ctx, client)
break
if c.testingEmulateOldVersionSettingsClient {
// The old version of the settings client does not understand anything
// else than setting events.
e.EventType = kvpb.TenantSettingsEvent_SETTING_EVENT
}

// Signal that startup is complete once we have enough events to start.
if settingsReady {
log.Infof(ctx, "received initial tenant settings")
var reconnect bool
switch e.EventType {
case kvpb.TenantSettingsEvent_METADATA_EVENT:
err := c.processMetadataEvent(ctx, e)
if err != nil {
log.Errorf(ctx, "error processing tenant settings event: %v", err)
reconnect = true
}

case kvpb.TenantSettingsEvent_SETTING_EVENT:
settingsReady, err := c.processSettingsEvent(ctx, e)
if err != nil {
log.Errorf(ctx, "error processing tenant settings event: %v", err)
reconnect = true
break
}

if startupCh != nil {
startupCh <- nil
close(startupCh)
startupCh = nil
// Signal that startup is complete once we have enough events
// to start. Note: we do not connect this condition to
// receiving the tenant metadata (via processMetadataEvent) for
// compatibility with pre-v23.2 servers which only send
// setting override events.
//
// Luckily, we are guaranteed that once we receive the setting
// overrides the metadata has been received as well (in v23.1+
// servers that send it) because when it is sent it is always
// sent prior to the setting overrides.
if settingsReady {
log.Infof(ctx, "received initial tenant settings")

if startupCh != nil {
startupCh <- nil
close(startupCh)
startupCh = nil
}
}
}

if reconnect {
_ = stream.CloseSend()
c.tryForgetClient(ctx, client)
break
}
}
}
}

// processMetadataEvent updates the tenant metadata based on the event.
func (c *connector) processMetadataEvent(ctx context.Context, e *kvpb.TenantSettingsEvent) error {
c.metadataMu.Lock()
defer c.metadataMu.Unlock()

c.metadataMu.receivedFirstMetadata = true
c.metadataMu.capabilities = e.Capabilities
c.metadataMu.tenantName = e.Name
// TODO(knz): Remove the cast once we have proper typing in the
// protobuf, which requires breaking a dependency cycle.
c.metadataMu.dataState = mtinfopb.TenantDataState(e.DataState)
c.metadataMu.serviceMode = mtinfopb.TenantServiceMode(e.ServiceMode)

log.Infof(ctx, "received tenant metadata: name=%q dataState=%v serviceMode=%v\ncapabilities=%+v",
c.metadataMu.tenantName, c.metadataMu.dataState, c.metadataMu.serviceMode, c.metadataMu.capabilities)

// Signal watchers that there was an update.
close(c.metadataMu.notifyCh)
c.metadataMu.notifyCh = make(chan struct{})

return nil
}

// TenantInfo accesses the tenant metadata.
func (c *connector) TenantInfo() (tenantcapabilities.Entry, <-chan struct{}) {
c.metadataMu.Lock()
defer c.metadataMu.Unlock()

return tenantcapabilities.Entry{
TenantID: c.tenantID,
TenantCapabilities: c.metadataMu.capabilities,
Name: c.metadataMu.tenantName,
DataState: c.metadataMu.dataState,
ServiceMode: c.metadataMu.serviceMode,
}, c.metadataMu.notifyCh
}

// processSettingsEvent updates the setting overrides based on the event.
func (c *connector) processSettingsEvent(
e *kvpb.TenantSettingsEvent,
ctx context.Context, e *kvpb.TenantSettingsEvent,
) (settingsReady bool, err error) {
c.settingsMu.Lock()
defer c.settingsMu.Unlock()
Expand Down Expand Up @@ -143,6 +218,8 @@ func (c *connector) processSettingsEvent(
return false, errors.Newf("unknown precedence value %d", e.Precedence)
}

log.Infof(ctx, "received %d setting overrides with precedence %v (incremental=%v)", len(e.Overrides), e.Precedence, e.Incremental)

// If the event is not incremental, clear the map.
if !e.Incremental {
for k := range m {
Expand Down
Loading