Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Feature/statefulsets: fix protocol detection for ports (#4752)
Browse files Browse the repository at this point in the history
Fixes k8s service port protocol detection and adds a 
headless service example to the demo. 

Fixes #3477

Signed-off-by: Keith Mattix II <[email protected]>
  • Loading branch information
keithmattix authored May 23, 2022
1 parent d1ef8b1 commit 9b11d76
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ export BOOKWAREHOUSE_NAMESPACE=bookwarehouse
# Default: true
#export PUBLISH_IMAGES=true

### optional: The local proxy mode for the control plane
# Default: Localhost
# export LOCAL_PROXY_MODE=Localhost

# See ./demo/deploy-vault.sh script on an example of how to deploy Hashicorp Vault
# to your Kubernetes cluster.
#--------------------------------------------------------------------------------
60 changes: 60 additions & 0 deletions demo/deploy-zookeeper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash

set -aueo pipefail

# shellcheck disable=SC1091
source .env
DEPLOY_ON_OPENSHIFT="${DEPLOY_ON_OPENSHIFT:-false}"
USE_PRIVATE_REGISTRY="${USE_PRIVATE_REGISTRY:-false}"
MESH_NAME="${MESH_NAME:-osm}"

kubectl create ns zookeeper

bin/osm namespace add --mesh-name "$MESH_NAME" zookeeper

bin/osm metrics enable --namespace zookeeper


helm install kafka bitnami/zookeeper --set replicaCount=3 --set serviceAccount.create=true --set serviceAccount.name=zookeeper --namespace zookeeper

if [ "$DEPLOY_ON_OPENSHIFT" = true ] ; then
oc adm policy add-scc-to-user privileged -z "zookeeper" -n "zookeeper"
if [ "$USE_PRIVATE_REGISTRY" = true ]; then
oc secrets link "zookeeper" "$CTR_REGISTRY_CREDS_NAME" --for=pull -n "zookeeper"
fi
fi

kubectl apply -nzookeeper -f - <<EOF
kind: TrafficTarget
apiVersion: access.smi-spec.io/v1alpha3
metadata:
name: zookeeper
namespace: zookeeper
spec:
destination:
kind: ServiceAccount
name: zookeeper
namespace: zookeeper
rules:
- kind: TCPRoute
name: zookeeper
sources:
- kind: ServiceAccount
name: zookeeper
namespace: zookeeper
---
apiVersion: specs.smi-spec.io/v1alpha4
kind: TCPRoute
metadata:
name: zookeeper
namespace: zookeeper
spec:
matches:
ports:
- 2181
- 3181
- 2888
- 3888
EOF

3 changes: 3 additions & 0 deletions demo/run-osm-demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ENABLE_FLUENTBIT="${ENABLE_FLUENTBIT:-false}"
DEPLOY_PROMETHEUS="${DEPLOY_PROMETHEUS:-false}"
DEPLOY_WITH_SAME_SA="${DEPLOY_WITH_SAME_SA:-false}"
ENVOY_LOG_LEVEL="${ENVOY_LOG_LEVEL:-debug}"
LOCAL_PROXY_MODE="${LOCAL_PROXY_MODE:-Localhost}"
DEPLOY_ON_OPENSHIFT="${DEPLOY_ON_OPENSHIFT:-false}"
TIMEOUT="${TIMEOUT:-90s}"
USE_PRIVATE_REGISTRY="${USE_PRIVATE_REGISTRY:-false}"
Expand Down Expand Up @@ -120,6 +121,7 @@ if [ "$CERT_MANAGER" = "vault" ]; then
--set=osm.deployPrometheus="$DEPLOY_PROMETHEUS" \
--set=osm.envoyLogLevel="$ENVOY_LOG_LEVEL" \
--set=osm.controllerLogLevel="trace" \
--set=osm.localProxyMode="$LOCAL_PROXY_MODE" \
--timeout="$TIMEOUT" \
$optionalInstallArgs
else
Expand All @@ -144,6 +146,7 @@ else
--set=osm.deployPrometheus="$DEPLOY_PROMETHEUS" \
--set=osm.envoyLogLevel="$ENVOY_LOG_LEVEL" \
--set=osm.controllerLogLevel="trace" \
--set=osm.localProxyMode="$LOCAL_PROXY_MODE" \
--timeout="$TIMEOUT" \
$optionalInstallArgs
fi
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/outbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (mc *MeshCatalog) getDestinationServicesFromTrafficTarget(t *access.Traffic

// listAllowedUpstreamServicesIncludeApex returns a list of services the given downstream service identity
// is authorized to communicate with, including traffic split apex services that are not backed by
// pods.
// pods as well as other sibling pods from the same headless service.
func (mc *MeshCatalog) listAllowedUpstreamServicesIncludeApex(downstreamIdentity identity.ServiceIdentity) []service.MeshService {
upstreamServices := mc.ListOutboundServicesForIdentity(downstreamIdentity)
if len(upstreamServices) == 0 {
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,8 @@ const (
// ServiceAliveResponse is the response returned by the server to indicate it is alive
ServiceAliveResponse = "Service is alive"
)

var (
// SupportedProtocolsInMesh is a list of the protocols OSM supports for in-mesh traffic
SupportedProtocolsInMesh = []string{ProtocolTCPServerFirst, ProtocolHTTP, ProtocolTCP, ProtocolGRPC}
)
18 changes: 17 additions & 1 deletion pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"strings"

mapset "github.com/deckarep/golang-set"
"github.com/pkg/errors"
Expand Down Expand Up @@ -353,9 +354,24 @@ func ServiceToMeshServices(c Controller, svc corev1.Service) []service.MeshServi
Namespace: svc.Namespace,
Name: svc.Name,
Port: uint16(portSpec.Port),
Protocol: pointer.StringDeref(portSpec.AppProtocol, constants.ProtocolHTTP),
}

// attempt to parse protocol from port name
// Order of Preference is:
// 1. port.appProtocol field
// 2. protocol prefixed to port name (e.g. tcp-my-port)
// 3. default to http
protocol := constants.ProtocolHTTP
for _, p := range constants.SupportedProtocolsInMesh {
if strings.HasPrefix(portSpec.Name, p+"-") {
protocol = p
break
}
}

// use port.appProtocol if specified, else use port protocol
meshSvc.Protocol = pointer.StringDeref(portSpec.AppProtocol, protocol)

// The endpoints for the kubernetes service carry information that allows
// us to retrieve the TargetPort for the MeshService.
endpoints, _ := c.GetEndpoints(meshSvc)
Expand Down
49 changes: 49 additions & 0 deletions pkg/k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,55 @@ func TestK8sServicesToMeshServices(t *testing.T) {
},
},
},
{
name: "k8s service with single port and endpoint, no appProtocol set, protocol in port name",
// Single port on the service maps to a single MeshService.
// Since no appProtocol is specified, MeshService.Protocol should match
// the protocol specified in the port name
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "tcp-p1",
Port: 80,
},
},
ClusterIP: "10.0.0.1",
},
},
svcEndpoints: []runtime.Object{
&corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
// Should match svc.Name and svc.Namespace
Namespace: "ns1",
Name: "s1",
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{
{
// Must match the port of 'svc.Spec.Ports[0]'
Port: 8080, // TargetPort
},
},
},
},
},
},
expected: []service.MeshService{
{
Namespace: "ns1",
Name: "s1",
Port: 80,
TargetPort: 8080,
Protocol: "tcp",
},
},
},
{
name: "k8s headless service with single port and endpoint, no appProtocol set",
// Single port on the service maps to a single MeshService.
Expand Down
148 changes: 148 additions & 0 deletions tests/e2e/e2e_statefulsets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package e2e

import (
"context"
"strings"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/rs/zerolog/log"
"github.com/servicemeshinterface/smi-sdk-go/pkg/apis/access/v1alpha3"
"github.com/servicemeshinterface/smi-sdk-go/pkg/apis/specs/v1alpha4"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/cli"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
. "github.com/openservicemesh/osm/tests/framework"
)

var _ = OSMDescribe("Test traffic among Statefulset members",
OSMDescribeInfo{
Tier: 1,
Bucket: 9,
OS: OSCrossPlatform,
},
func() {
Context("Statefulsets", func() {
It("pods succeed while establishing consensus", func() {
// Install OSM (with proxyMode = podIP)
Expect(Td.InstallOSM(Td.GetOSMInstallOpts(WithLocalProxyMode(v1alpha2.LocalProxyModePodIP)))).To(Succeed())

const testNS = "zookeeper"

// Create Test NS
Expect(Td.CreateNs(testNS, nil)).To(Succeed())
Expect(Td.AddNsToMesh(true, testNS)).To(Succeed())

helmCfg := &action.Configuration{}
Expect(helmCfg.Init(Td.Env.RESTClientGetter(), testNS, "secret", log.Info().Msgf)).To(Succeed())

install := action.NewInstall(helmCfg)

install.ReleaseName = "kafka"
install.Namespace = testNS
install.Timeout = 30 * time.Second
saName := "zookeeper"
replicaCount := 3

cli := cli.New()
chartPath, err := install.LocateChart("https://charts.bitnami.com/bitnami/zookeeper-9.0.2.tgz", cli)
Expect(err).NotTo(HaveOccurred())
chart, err := loader.Load(chartPath)
Expect(err).NotTo(HaveOccurred())

// Install zookeeper
_, err = install.Run(chart, map[string]interface{}{
"replicaCount": replicaCount,
"serviceAccount": map[string]interface{}{
"create": true,
"name": saName,
},
})

Expect(err).NotTo(HaveOccurred())

// Create SMI resources for Zookeeper
_, err = Td.CreateTCPRoute(testNS, v1alpha4.TCPRoute{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNS,
Name: "zookeeper",
},
Spec: v1alpha4.TCPRouteSpec{
Matches: v1alpha4.TCPMatch{
Ports: []int{2181, 3181, 2888, 3888},
},
},
})
Expect(err).NotTo(HaveOccurred())

_, err = Td.CreateTrafficTarget(testNS, v1alpha3.TrafficTarget{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNS,
Name: "zookeeper",
},
Spec: v1alpha3.TrafficTargetSpec{
Sources: []v1alpha3.IdentityBindingSubject{
{
Kind: "ServiceAccount",
Name: saName,
Namespace: testNS,
},
},
Destination: v1alpha3.IdentityBindingSubject{
Kind: "ServiceAccount",
Name: saName,
Namespace: testNS,
},
Rules: []v1alpha3.TrafficTargetRule{
{
Kind: "TCPRoute",
Name: "zookeeper",
},
},
},
})
Expect(err).NotTo(HaveOccurred())

Expect(Td.WaitForPodsRunningReady(testNS, 90*time.Second, replicaCount, nil)).To(Succeed())

pods, err := Td.Client.CoreV1().Pods(testNS).List(context.TODO(), metav1.ListOptions{})

Expect(err).NotTo(HaveOccurred())

// this command will exit 1 if connectivity isn't established
cmd := "/opt/bitnami/zookeeper/bin/zkServer.sh status"

cond := Td.WaitForRepeatedSuccess(func() bool {

results := map[string]error{}
for _, po := range pods.Items {
stdout, stderr, err := Td.RunRemote(testNS, po.GetName(), "zookeeper", strings.Fields(cmd))

Td.T.Logf("> (%s) Stdout %s | Stderr: %s", po.GetName(), stdout, stderr)

results[po.GetName()] = err
}

hadErr := false
for podName, err := range results {
if err != nil {
Td.T.Logf("> (%s) ZK status check failed: expected nil err, got %s", podName, err)
hadErr = true
continue
}

Td.T.Logf("> (%s) ZK status check succeeded!", podName)
}

return !hadErr
}, 1, 90*time.Second)

Expect(cond).To(BeTrue())

})
})
})

0 comments on commit 9b11d76

Please sign in to comment.