Skip to content

Commit

Permalink
kvtenant: define a cross-version metadata protocol test
Browse files Browse the repository at this point in the history
This commit introduces a new test to check that old-version tenant
clients can operate with a new-version KV server, and vice-versa.

Release note: None
  • Loading branch information
knz committed Jul 7, 2023
1 parent 0edec3d commit 952e767
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 13 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvclient/kvtenant/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,18 @@ type connector struct {
notifyCh chan struct{}
}

// testingEmulateOldVersionSettingsClient is set to true when the
// connector should emulate the version where it processed all
// events as settings events. Used only for testing.
testingEmulateOldVersionSettingsClient bool

metadataMu struct {
syncutil.Mutex

// receivedFirstMetadata is set to true when the first batch of
// metadata bits has been received.
receivedFirstMetadata bool

tenantName roachpb.TenantName
dataState mtinfopb.TenantDataState
serviceMode mtinfopb.TenantServiceMode
Expand Down
30 changes: 17 additions & 13 deletions pkg/kv/kvclient/kvtenant/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type mockServer struct {
rangeLookupFn func(context.Context, *kvpb.RangeLookupRequest) (*kvpb.RangeLookupResponse, error)
gossipSubFn func(*kvpb.GossipSubscriptionRequest, kvpb.Internal_GossipSubscriptionServer) error
tenantSettingsFn func(request *kvpb.TenantSettingsRequest, server kvpb.Internal_TenantSettingsServer) error

emulateOldVersionSettingServer bool
}

func (m *mockServer) RangeLookup(
Expand Down Expand Up @@ -80,19 +82,21 @@ func (m *mockServer) TenantSettings(
}); err != nil {
return err
}
// Initial tenant metadata.
if err := stream.Send(&kvpb.TenantSettingsEvent{
EventType: kvpb.TenantSettingsEvent_METADATA_EVENT,
Name: "foo",
// TODO(knz): remove cast after the dep cycle has been resolved.
DataState: uint32(mtinfopb.DataStateReady),
ServiceMode: uint32(mtinfopb.ServiceModeExternal),

// Make the event look like a fake no-op setting event.
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: true,
}); err != nil {
return err
if !m.emulateOldVersionSettingServer {
// Initial tenant metadata.
if err := stream.Send(&kvpb.TenantSettingsEvent{
EventType: kvpb.TenantSettingsEvent_METADATA_EVENT,
Name: "foo",
// TODO(knz): remove cast after the dep cycle has been resolved.
DataState: uint32(mtinfopb.DataStateReady),
ServiceMode: uint32(mtinfopb.ServiceModeExternal),

// Need to ensure this looks like a fake no-op setting override event.
Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES,
Incremental: true,
}); err != nil {
return err
}
}
// Finish startup.
return stream.Send(&kvpb.TenantSettingsEvent{
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvtenant/setting_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
c.settingsMu.receivedFirstAllTenantOverrides = false
c.settingsMu.receivedFirstSpecificOverrides = false
}()
func() {
c.metadataMu.Lock()
defer c.metadataMu.Unlock()
c.metadataMu.receivedFirstMetadata = false
}()

for {
e, err := stream.Recv()
Expand All @@ -73,6 +78,12 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
continue
}

if c.testingEmulateOldVersionSettingsClient {
// The old version of the settings client does not understand anything
// else than setting events.
e.EventType = kvpb.TenantSettingsEvent_SETTING_EVENT
}

var reconnect bool
switch e.EventType {
case kvpb.TenantSettingsEvent_METADATA_EVENT:
Expand Down Expand Up @@ -125,6 +136,7 @@ func (c *connector) processMetadataEvent(ctx context.Context, e *kvpb.TenantSett
c.metadataMu.Lock()
defer c.metadataMu.Unlock()

c.metadataMu.receivedFirstMetadata = true
c.metadataMu.capabilities = e.Capabilities
c.metadataMu.tenantName = e.Name
// TODO(knz): Remove the cast once we have proper typing in the
Expand Down
76 changes: 76 additions & 0 deletions pkg/kv/kvclient/kvtenant/setting_overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,79 @@ func expectMetadata(t *testing.T, c *connector, exp string) <-chan struct{} {

return updateCh
}

// TestCrossVersionMetadataSupport tests that and old-version
// connector can talk to a new-version server and a new-version server
// can talk to an old-version connector.
func TestCrossVersionMetadataSupport(t *testing.T) {
defer leaktest.AfterTest(t)()

for _, b := range []bool{false, true} {
oldVersionClient := b
oldVersionServer := !b
strs := map[bool]string{false: "new", true: "old"}
t.Run(fmt.Sprintf("client=%s/server=%s", strs[oldVersionClient], strs[oldVersionServer]), func(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)

gossipSubFn := func(req *kvpb.GossipSubscriptionRequest, stream kvpb.Internal_GossipSubscriptionServer) error {
return stream.Send(gossipEventForClusterID(rpcContext.StorageClusterID.Get()))
}
server := &mockServer{
gossipSubFn: gossipSubFn,
emulateOldVersionSettingServer: oldVersionServer,
}
kvpb.RegisterInternalServer(s, server)
ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr)
require.NoError(t, err)

cfg := ConnectorConfig{
TenantID: roachpb.MustMakeTenantID(5),
AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()),
RPCContext: rpcContext,
RPCRetryOptions: rpcRetryOpts,
}
addrs := []string{ln.Addr().String()}
c := newConnector(cfg, addrs)
c.testingEmulateOldVersionSettingsClient = oldVersionClient

// Start the connector.
startedC := make(chan error)
go func() {
startedC <- c.Start(ctx)
}()
select {
case err := <-startedC:
require.NoError(t, err)
case <-time.After(10 * time.Second):
t.Fatalf("failed to see start complete")
}

// In any case check that the overrides are available.
func() {
t.Helper()
c.settingsMu.Lock()
defer c.settingsMu.Unlock()

require.True(t, c.settingsMu.receivedFirstAllTenantOverrides)
require.True(t, c.settingsMu.receivedFirstSpecificOverrides)
}()

// If either the client or the server is new, the metadata is
// not communicated or not processed.
receivedFirstMetadata := func() bool {
c.metadataMu.Lock()
defer c.metadataMu.Unlock()
return c.metadataMu.receivedFirstMetadata
}()
require.False(t, receivedFirstMetadata)
expectMetadata(t, c, `tid=5 name="" data=add service=none caps=<nil>`)
})
}
}

0 comments on commit 952e767

Please sign in to comment.