Skip to content

Commit

Permalink
Add E2E test for clusterset IP
Browse files Browse the repository at this point in the history
Also detect if clusterset IP is enabled globally by inspecting the
env var passed to the pod container in the LH agent deployemnt and, if
so, skip the other ClusterIP service discovery tests since they will
fail.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Sep 23, 2024
1 parent 70fcb84 commit 26efddb
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 21 deletions.
77 changes: 77 additions & 0 deletions test/e2e/discovery/clusterset_ip_enabled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
SPDX-License-Identifier: Apache-2.0
Copyright Contributors to the Submariner project.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"fmt"
"strconv"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/lighthouse/pkg/constants"
lhframework "github.com/submariner-io/lighthouse/test/e2e/framework"
"github.com/submariner-io/shipyard/test/e2e/framework"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

var _ = Describe("Test Service Discovery Across Clusters", Label(ClusterSetIPTestLabel), func() {
f := lhframework.NewFramework("discovery")

When("clusterset IP is enabled for an exported service", func() {
It("should resolve the allocated clusterset IP", func() {
RunClusterSetIPTest(f)
})
})
})

func RunClusterSetIPTest(f *lhframework.Framework) {
clusterAName := framework.TestContext.ClusterIDs[framework.ClusterA]
clusterBName := framework.TestContext.ClusterIDs[framework.ClusterB]

framework.By(fmt.Sprintf("Creating an Nginx Deployment on on %q", clusterBName))
f.NewNginxDeployment(framework.ClusterB)

framework.By(fmt.Sprintf("Creating a Nginx Service on %q", clusterBName))

nginxServiceClusterB := f.NewNginxService(framework.ClusterB)

f.CreateServiceExport(framework.ClusterB, &mcsv1a1.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Name: nginxServiceClusterB.Name,
Namespace: nginxServiceClusterB.Namespace,
Annotations: map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)},
},
})

f.AwaitServiceExportedStatusCondition(framework.ClusterB, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace)

framework.By(fmt.Sprintf("Creating a Netshoot Deployment on %q", clusterAName))

netshootPodList := f.NewNetShootDeployment(framework.ClusterA)

svc, err := f.GetService(framework.ClusterB, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace)
Expect(err).NotTo(HaveOccurred())

nginxServiceClusterB = svc
serviceImport := f.AwaitAggregatedServiceImport(framework.ClusterA, nginxServiceClusterB, 1)
Expect(serviceImport.Spec.IPs).To(HaveLen(1), "ServiceImport was not allocated an IP")

f.VerifyIPWithDig(framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, "", serviceImport.Spec.IPs[0], true)
}
9 changes: 8 additions & 1 deletion test/e2e/discovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ import (
)

const (
TestLabel = "service-discovery"
TestLabel = "service-discovery"
ClusterSetIPTestLabel = "clusterset-ip"
)

var checkedDomains = lhframework.CheckedDomains

var _ = Describe("Test Service Discovery Across Clusters", Label(TestLabel), func() {
f := lhframework.NewFramework("discovery")

BeforeEach(func() {
if lhframework.ClusterSetIPEnabled {
Skip("The clusterset IP feature is enabled globally - skipping the test")
}
})

When("a pod tries to resolve a service in a remote cluster", func() {
It("should be able to discover the remote service successfully", func() {
RunServiceDiscoveryTest(f)
Expand Down
69 changes: 49 additions & 20 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/names"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/slices"
"github.com/submariner-io/lighthouse/pkg/constants"
"github.com/submariner-io/shipyard/test/e2e/framework"
Expand Down Expand Up @@ -61,9 +64,10 @@ type Framework struct {
}

var (
MCSClients []*mcsClientset.Clientset
EndpointClients []dynamic.ResourceInterface
SubmarinerClients []dynamic.ResourceInterface
MCSClients []*mcsClientset.Clientset
EndpointClients []dynamic.ResourceInterface
SubmarinerClients []dynamic.ResourceInterface
ClusterSetIPEnabled = false
)

func init() {
Expand Down Expand Up @@ -97,6 +101,23 @@ func beforeSuite() {
}

framework.DetectGlobalnet()

for _, k8sClient := range framework.KubeClients {
framework.AwaitUntil("find lighthouse agent deployment", func() (interface{}, error) {
return k8sClient.AppsV1().Deployments(framework.TestContext.SubmarinerNamespace).Get(context.TODO(),
names.ServiceDiscoveryComponent, metav1.GetOptions{})
}, func(result interface{}) (bool, string, error) {
d := result.(*appsv1.Deployment)
for i := range d.Spec.Template.Spec.Containers[0].Env {
if d.Spec.Template.Spec.Containers[0].Env[i].Name == "SUBMARINER_CLUSTERSET_IP_ENABLED" &&
d.Spec.Template.Spec.Containers[0].Env[i].Value == strconv.FormatBool(true) {
ClusterSetIPEnabled = true
}
}

return true, "", nil
})
}
}

func createLighthouseClient(restConfig *rest.Config) *mcsClientset.Clientset {
Expand Down Expand Up @@ -131,18 +152,23 @@ func createSubmarinerClientSet(restConfig *rest.Config) dynamic.ResourceInterfac
}

func (f *Framework) NewServiceExport(cluster framework.ClusterIndex, name, namespace string) *mcsv1a1.ServiceExport {
nginxServiceExport := mcsv1a1.ServiceExport{
return f.CreateServiceExport(cluster, &mcsv1a1.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: name,
Namespace: namespace,
},
}
se := MCSClients[cluster].MulticlusterV1alpha1().ServiceExports(namespace)
framework.By(fmt.Sprintf("Creating serviceExport %s.%s on %q", name, namespace, framework.TestContext.ClusterIDs[cluster]))
serviceExport := framework.AwaitUntil("create serviceExport", func() (interface{}, error) {
return se.Create(context.TODO(), &nginxServiceExport, metav1.CreateOptions{})
}, framework.NoopCheckResult).(*mcsv1a1.ServiceExport)
})
}

func (f *Framework) CreateServiceExport(cluster framework.ClusterIndex, serviceExport *mcsv1a1.ServiceExport) *mcsv1a1.ServiceExport {
se := MCSClients[cluster].MulticlusterV1alpha1().ServiceExports(serviceExport.Namespace)

framework.By(fmt.Sprintf("Creating serviceExport %s.%s on %q", serviceExport.Name, serviceExport.Namespace,
framework.TestContext.ClusterIDs[cluster]))

return serviceExport
return framework.AwaitUntil("create serviceExport", func() (interface{}, error) {
return se.Create(context.TODO(), serviceExport, metav1.CreateOptions{})
}, framework.NoopCheckResult).(*mcsv1a1.ServiceExport)
}

func (f *Framework) AwaitServiceExportedStatusCondition(cluster framework.ClusterIndex, name, namespace string) {
Expand Down Expand Up @@ -186,14 +212,17 @@ func (f *Framework) GetService(cluster framework.ClusterIndex, name, namespace s
return framework.KubeClients[cluster].CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.ClusterIndex, svc *v1.Service, clusterCount int) {
func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.ClusterIndex, svc *v1.Service, clusterCount int,
) *mcsv1a1.ServiceImport {
framework.By(fmt.Sprintf("Retrieving ServiceImport for %q in ns %q on %q", svc.Name, svc.Namespace,
framework.TestContext.ClusterIDs[targetCluster]))

si := MCSClients[targetCluster].MulticlusterV1alpha1().ServiceImports(svc.Namespace)
var si *mcsv1a1.ServiceImport

siClient := MCSClients[targetCluster].MulticlusterV1alpha1().ServiceImports(svc.Namespace)

framework.AwaitUntil("retrieve ServiceImport", func() (interface{}, error) {
obj, err := si.Get(context.TODO(), svc.Name, metav1.GetOptions{})
obj, err := siClient.Get(context.TODO(), svc.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil, nil //nolint:nilnil // Intentional
}
Expand All @@ -212,7 +241,7 @@ func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.Cluster
return false, "ServiceImport not found", nil
}

si := result.(*mcsv1a1.ServiceImport)
si = result.(*mcsv1a1.ServiceImport)

if len(si.Status.Clusters) != clusterCount {
return false, fmt.Sprintf("Actual cluster count %d does not match expected %d",
Expand All @@ -231,14 +260,14 @@ func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.Cluster
if svc.Spec.ClusterIP != v1.ClusterIPNone && !slices.Equivalent(expPorts, si.Spec.Ports, func(p mcsv1a1.ServicePort) string {
return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port)
}) {
s1, _ := json.MarshalIndent(expPorts, "", " ")
s2, _ := json.MarshalIndent(si.Spec.Ports, "", " ")

return false, fmt.Sprintf("ServiceImport ports do not match. Expected: %s, Actual: %s", s1, s2), nil
return false, fmt.Sprintf("ServiceImport ports do not match. Expected: %s, Actual: %s",
resource.ToJSON(expPorts), resource.ToJSON(si.Spec.Ports)), nil
}

return true, "", nil
})

return si
}

func (f *Framework) NewHeadlessServiceWithParams(name, portName string, protcol v1.Protocol,
Expand Down

0 comments on commit 26efddb

Please sign in to comment.