Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
(feat/statefulsets): MeshService API changes for Headless Services (#…
Browse files Browse the repository at this point in the history
…4704)

* Introduce service.ProviderMapper to MeshService

As a stepping stone for statefulset support (#3477), introduce a new
interface describing the ability to map an entity back to a provider
service (e.g. a Kubernetes service). This decouples the MeshService name
from being a foreign key between the provider's collection of services
and the MeshCatalog's set of services

Signed-off-by: Keith Mattix II <[email protected]>

* Add subdomain field to Meshservice

Signed-off-by: Keith Mattix II <[email protected]>

* Write tests for headless service functionality

Signed-off-by: Keith Mattix II <[email protected]>

* Address PR comments

Signed-off-by: Keith Mattix II <[email protected]>

* Filter MeshServices from headless service based on subdomain

When retrieving MeshServices in order to create local clusters for a pod,
exclude MeshServices whose subdomains don't match the pod's name

Signed-off-by: Keith Mattix II <[email protected]>

* Implement MeshService creation functions

Tracking the unexported MeshService fields is difficult, and there
are several hidden bugs that can occur depending on what fields are
accessed. So, we create the NewMeshService and NewPartialMeshService
functions to aid in correct usage

Signed-off-by: Keith Mattix II <[email protected]>

* Fix arg order

Signed-off-by: Keith Mattix II <[email protected]>

* Remove unnecessary function & newline

Signed-off-by: Keith Mattix II <[email protected]>

* Re-duplicate svc to meshsvc code and de-memoize MeshService

Signed-off-by: Keith Mattix II <[email protected]>

* Remove unneccesary Equals function

Signed-off-by: Keith Mattix II <[email protected]>

* Remove unneeded newline

Signed-off-by: Keith Mattix II <[email protected]>

* Add mock expectation

Signed-off-by: Keith Mattix II <[email protected]>

* Add kubecontroller to splitHostname functions

Signed-off-by: Keith Mattix II <[email protected]>

* Add mock test expectation

Signed-off-by: Keith Mattix II <[email protected]>

* Fix tests

Signed-off-by: Keith Mattix II <[email protected]>

* Fix tests again

Signed-off-by: Keith Mattix II <[email protected]>

* Address PR comments

Signed-off-by: Keith Mattix II <[email protected]>

* Remove unneded function

Signed-off-by: Keith Mattix II <[email protected]>

* Comment exported function

Signed-off-by: Keith Mattix II <[email protected]>

* Additional PR fixes

Signed-off-by: Keith Mattix II <[email protected]>

* Simplify tests

Signed-off-by: Keith Mattix II <[email protected]>
  • Loading branch information
keithmattix authored May 19, 2022
1 parent 3395da5 commit 0af42df
Show file tree
Hide file tree
Showing 26 changed files with 954 additions and 340 deletions.
9 changes: 8 additions & 1 deletion pkg/catalog/inbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,20 @@ func (mc *MeshCatalog) getUpstreamServicesIncludeApex(upstreamServices []service
}

for _, split := range mc.meshSpec.ListTrafficSplits(smi.WithTrafficSplitBackendService(svc)) {
svcName := k8s.GetServiceFromHostname(mc.kubeController, split.Spec.Service)
subdomain := k8s.GetSubdomainFromHostname(mc.kubeController, split.Spec.Service)
apexMeshService := service.MeshService{
Namespace: svc.Namespace,
Name: k8s.GetServiceFromHostname(split.Spec.Service),
Name: svcName,
Port: svc.Port,
TargetPort: svc.TargetPort,
Protocol: svc.Protocol,
}

if subdomain != "" {
apexMeshService.Name = fmt.Sprintf("%s.%s", subdomain, svcName)
}

if newlyAdded := svcSet.Add(apexMeshService); newlyAdded {
allServices = append(allServices, apexMeshService)
}
Expand Down
102 changes: 102 additions & 0 deletions pkg/catalog/inbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
permissiveMode bool
trafficTargets []*access.TrafficTarget
httpRouteGroups []*spec.HTTPRouteGroup
tcpRoutes []*spec.TCPRoute
trafficSplits []*split.TrafficSplit
prepare func(mockMeshSpec *smi.MockMeshSpec, trafficSplits []*split.TrafficSplit)
expectedInboundMeshPolicy *trafficpolicy.InboundMeshTrafficPolicy
Expand Down Expand Up @@ -213,6 +214,104 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
},
},
},
{
name: "multiple services, statefulset, SMI mode, 1 TrafficTarget, 1 TCPRoute, 0 TrafficSplit",
upstreamIdentity: upstreamSvcAccount.ToServiceIdentity(),
upstreamServices: []service.MeshService{
{
Name: "mysql-0.mysql",
Namespace: "ns1",
Port: 3306,
TargetPort: 3306,
Protocol: "tcp",
},
{
Name: "s2",
Namespace: "ns1",
Port: 90,
TargetPort: 9090,
Protocol: "http",
},
},
permissiveMode: false,
trafficTargets: []*access.TrafficTarget{
{
TypeMeta: metav1.TypeMeta{
APIVersion: "access.smi-spec.io/v1alpha3",
Kind: "TrafficTarget",
},
ObjectMeta: metav1.ObjectMeta{
Name: "t1",
Namespace: "ns1",
},
Spec: access.TrafficTargetSpec{
Destination: access.IdentityBindingSubject{
Kind: "ServiceAccount",
Name: "sa1",
Namespace: "ns1",
},
Sources: []access.IdentityBindingSubject{{
Kind: "ServiceAccount",
Name: "sa2",
Namespace: "ns2",
}},
Rules: []access.TrafficTargetRule{{
Kind: "TCPRoute",
Name: "rule-1",
}},
},
},
},
tcpRoutes: []*spec.TCPRoute{
{
TypeMeta: metav1.TypeMeta{
APIVersion: "specs.smi-spec.io/v1alpha4",
Kind: "TCPRoute",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "rule-1",
},
Spec: spec.TCPRouteSpec{
Matches: spec.TCPMatch{
Ports: []int{3306},
},
},
},
},
trafficSplits: nil,
prepare: func(mockMeshSpec *smi.MockMeshSpec, trafficSplits []*split.TrafficSplit) {
mockMeshSpec.EXPECT().ListTrafficSplits(gomock.Any()).Return(trafficSplits).AnyTimes()
},
expectedInboundMeshPolicy: &trafficpolicy.InboundMeshTrafficPolicy{
TrafficMatches: []*trafficpolicy.TrafficMatch{
{
Name: "inbound_ns1/mysql-0.mysql_3306_tcp",
DestinationPort: 3306,
DestinationProtocol: "tcp",
},
{
Name: "inbound_ns1/s2_9090_http",
DestinationPort: 9090,
DestinationProtocol: "http",
},
},
ClustersConfigs: []*trafficpolicy.MeshClusterConfig{
{
Name: "ns1/mysql-0.mysql|3306|local",
Service: service.MeshService{Namespace: "ns1", Name: "mysql-0.mysql", Port: 3306, TargetPort: 3306, Protocol: "tcp"},
Address: "127.0.0.1",
Port: 3306,
},
{
Name: "ns1/s2|9090|local",
Service: service.MeshService{Namespace: "ns1", Name: "s2", Port: 90, TargetPort: 9090, Protocol: "http"},
Address: "127.0.0.1",
Port: 9090,
},
},
},
},
{
name: "multiple services, SMI mode, 1 TrafficTarget, multiple HTTPRouteGroup, 0 TrafficSplit",
upstreamIdentity: upstreamSvcAccount.ToServiceIdentity(),
Expand Down Expand Up @@ -1614,6 +1713,9 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
for expectedKey, expectedVal := range tc.expectedInboundMeshPolicy.HTTPRouteConfigsPerPort {
assert.ElementsMatch(expectedVal, actual.HTTPRouteConfigsPerPort[expectedKey])
}
if len(tc.expectedInboundMeshPolicy.TrafficMatches) != 0 {
assert.ElementsMatch(tc.expectedInboundMeshPolicy.TrafficMatches, actual.TrafficMatches)
}
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/catalog/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ func (mc *MeshCatalog) GetIngressTrafficPolicy(svc service.MeshService) (*traffi
for _, source := range ingressBackendPolicy.Spec.Sources {
switch source.Kind {
case policyV1alpha1.KindService:
sourceMeshSvc := service.MeshService{Name: source.Name, Namespace: source.Namespace}
sourceMeshSvc := service.MeshService{
Name: source.Name,
Namespace: source.Namespace,
}
endpoints := mc.listEndpointsForService(sourceMeshSvc)
if len(endpoints) == 0 {
ingressBackendWithStatus.Status = policyV1alpha1.IngressBackendStatus{
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/outbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (mc *MeshCatalog) listAllowedUpstreamServicesIncludeApex(downstreamIdentity
}
for _, backend := range split.Spec.Backends {
if backend.Service == upstreamSvc.Name {
rootServiceName := k8s.GetServiceFromHostname(split.Spec.Service)
rootServiceName := k8s.GetServiceFromHostname(mc.kubeController, split.Spec.Service)
rootMeshService := service.MeshService{
Namespace: split.Namespace,
Name: rootServiceName,
Expand Down
59 changes: 34 additions & 25 deletions pkg/catalog/outbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {
Namespace: "ns3",
},
Spec: split.TrafficSplitSpec{
Service: "s3.ns3.cluster.local",
Service: "s3.ns3.svc.cluster.local",
Backends: []split.TrafficSplitBackend{
{
Service: "s3-v1",
Expand Down Expand Up @@ -767,29 +767,13 @@ func TestGetDestinationServicesFromTrafficTarget(t *testing.T) {
}

func TestListAllowedUpstreamServicesIncludeApex(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)
mockConfigurator := configurator.NewMockConfigurator(mockCtrl)
mockController := k8s.NewMockController(mockCtrl)
mockServiceProvider := service.NewMockProvider(mockCtrl)
mockConfigurator.EXPECT().GetFeatureFlags().Return(configv1alpha2.FeatureFlags{EnableMulticlusterMode: true}).AnyTimes()
mockConfigurator.EXPECT().GetOSMNamespace().Return("osm-system").AnyTimes()

mc := MeshCatalog{
meshSpec: mockMeshSpec,
kubeController: mockController,
configurator: mockConfigurator,
serviceProviders: []service.Provider{mockServiceProvider},
}

testCases := []struct {
name string
id identity.ServiceIdentity
services []*corev1.Service
trafficSplits []*split.TrafficSplit
expected []service.MeshService
name string
id identity.ServiceIdentity
services []*corev1.Service
trafficSplits []*split.TrafficSplit
expected []service.MeshService
foundNamespace bool
}{
{
name: "no allowed outbound services",
Expand Down Expand Up @@ -857,6 +841,7 @@ func TestListAllowedUpstreamServicesIncludeApex(t *testing.T) {
},
},
},
foundNamespace: true,
expected: []service.MeshService{
{
Name: "split-svc",
Expand All @@ -882,8 +867,9 @@ func TestListAllowedUpstreamServicesIncludeApex(t *testing.T) {
},
},
{
name: "TrafficSplit apex service should not have duplicate when it does not have endpoints",
id: "my-src-ns.my-src-name",
name: "TrafficSplit apex service should not have duplicate when it does not have endpoints",
id: "my-src-ns.my-src-name",
foundNamespace: true,
services: []*corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -981,6 +967,22 @@ func TestListAllowedUpstreamServicesIncludeApex(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)
mockConfigurator := configurator.NewMockConfigurator(mockCtrl)
mockController := k8s.NewMockController(mockCtrl)
mockServiceProvider := service.NewMockProvider(mockCtrl)
mockConfigurator.EXPECT().GetFeatureFlags().Return(configv1alpha2.FeatureFlags{EnableMulticlusterMode: true}).AnyTimes()
mockConfigurator.EXPECT().GetOSMNamespace().Return("osm-system").AnyTimes()

mc := MeshCatalog{
meshSpec: mockMeshSpec,
kubeController: mockController,
configurator: mockConfigurator,
serviceProviders: []service.Provider{mockServiceProvider},
}
var meshServices []service.MeshService

for _, k8sSvc := range tc.services {
Expand All @@ -993,6 +995,13 @@ func TestListAllowedUpstreamServicesIncludeApex(t *testing.T) {
mockMeshSpec.EXPECT().ListTrafficSplits().Return(tc.trafficSplits).Times(1)
}

var ns *corev1.Namespace
if tc.foundNamespace {
ns = &corev1.Namespace{}
}

mockController.EXPECT().GetNamespace(gomock.Any()).Return(ns).AnyTimes()

tassert.ElementsMatch(t, tc.expected, mc.listAllowedUpstreamServicesIncludeApex(tc.id))
})
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/catalog/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func (mc *MeshCatalog) getRetryPolicy(downstreamIdentity identity.ServiceIdentit
log.Error().Msgf("Retry policy destinations must be a service: %s is a %s", dest, dest.Kind)
continue
}

if upstreamSvc.Name == dest.Name && upstreamSvc.Namespace == dest.Namespace {
destMeshSvc := service.MeshService{Name: dest.Name, Namespace: dest.Namespace}
// we want all statefulset replicas to have the same retry policy regardless of how they're accessed
// for the default use-case, this is equivalent to a name + namespace equality check
if upstreamSvc.SiblingTo(destMeshSvc) {
// Will return retry policy that applies to the specific upstream service
return &retryCRD.Spec.RetryPolicy
}
Expand Down
Loading

0 comments on commit 0af42df

Please sign in to comment.