diff --git a/scripts/run-static-canary.sh b/scripts/run-static-canary.sh new file mode 100755 index 0000000000..91087b06cd --- /dev/null +++ b/scripts/run-static-canary.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# The script runs amazon-vpc-cni static canary tests +# The tests in this suite are designed to exercise AZ failure scenarios. + +set -e + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +GINKGO_TEST_BUILD="$SCRIPT_DIR/../test/build" +# TEST_IMAGE_REGISTRY is the registry in test-infra-* accounts where e2e test images are stored +TEST_IMAGE_REGISTRY=${TEST_IMAGE_REGISTRY:-"617930562442.dkr.ecr.us-west-2.amazonaws.com"} + +source "$SCRIPT_DIR"/lib/cluster.sh +source "$SCRIPT_DIR"/lib/canary.sh + +function run_ginkgo_test() { + local focus=$1 + echo "Running ginkgo tests with focus: $focus" + + (CGO_ENABLED=0 ginkgo $EXTRA_GINKGO_FLAGS --no-color --focus="$focus" -v --timeout 30m --fail-on-pending $GINKGO_TEST_BUILD/cni.test -- \ + --cluster-kubeconfig="$KUBE_CONFIG_PATH" \ + --cluster-name="$CLUSTER_NAME" \ + --aws-region="$REGION" \ + --aws-vpc-id="$VPC_ID" \ + --ng-name-label-key="kubernetes.io/os" \ + --ng-name-label-val="linux" \ + --test-image-registry=$TEST_IMAGE_REGISTRY) +} + +load_cluster_details + +run_ginkgo_test "STATIC_CANARY" + +echo "all tests ran successfully in $(($SECONDS / 60)) minutes and $(($SECONDS % 60)) seconds" \ No newline at end of file diff --git a/test/framework/resources/k8s/manifest/container.go b/test/framework/resources/k8s/manifest/container.go index 3effc7f768..4afdb6d295 100644 --- a/test/framework/resources/k8s/manifest/container.go +++ b/test/framework/resources/k8s/manifest/container.go @@ -28,6 +28,7 @@ type Container struct { probe *v1.Probe ports []v1.ContainerPort securityContext *v1.SecurityContext + Env []v1.EnvVar } func NewBusyBoxContainerBuilder(testImageRegistry string) *Container { @@ -101,6 +102,11 @@ func (w *Container) Command(cmd []string) *Container { return w } +func (w *Container) EnvVar(env []v1.EnvVar) *Container { + w.Env = env + return w +} + func (w *Container) Args(arg []string) *Container { w.args = arg return w @@ -126,5 +132,6 @@ func (w *Container) Build() v1.Container { LivenessProbe: w.probe, Ports: w.ports, SecurityContext: w.securityContext, + Env: w.Env, } } diff --git a/test/framework/resources/k8s/manifest/daemonset.go b/test/framework/resources/k8s/manifest/daemonset.go new file mode 100644 index 0000000000..2d8a9720a9 --- /dev/null +++ b/test/framework/resources/k8s/manifest/daemonset.go @@ -0,0 +1,123 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package manifest + +import ( + "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" + "github.com/aws/aws-sdk-go/aws" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type DaemonsetBuilder struct { + namespace string + name string + container corev1.Container + labels map[string]string + nodeSelector map[string]string + terminationGracePeriod int + hostNetwork bool + volume []corev1.Volume + volumeMount []corev1.VolumeMount +} + +func NewDefaultDaemonsetBuilder() *DaemonsetBuilder { + return &DaemonsetBuilder{ + namespace: utils.DefaultTestNamespace, + terminationGracePeriod: 1, + labels: map[string]string{"role": "test"}, + nodeSelector: map[string]string{"kubernetes.io/os": "linux"}, + } +} + +func (d *DaemonsetBuilder) Labels(labels map[string]string) *DaemonsetBuilder { + d.labels = labels + return d +} + +func (d *DaemonsetBuilder) NodeSelector(labelKey string, labelVal string) *DaemonsetBuilder { + if labelKey != "" { + d.nodeSelector[labelKey] = labelVal + } + return d +} + +func (d *DaemonsetBuilder) Namespace(namespace string) *DaemonsetBuilder { + d.namespace = namespace + return d +} + +func (d *DaemonsetBuilder) TerminationGracePeriod(tg int) *DaemonsetBuilder { + d.terminationGracePeriod = tg + return d +} + +func (d *DaemonsetBuilder) Name(name string) *DaemonsetBuilder { + d.name = name + return d +} + +func (d *DaemonsetBuilder) Container(container corev1.Container) *DaemonsetBuilder { + d.container = container + return d +} + +func (d *DaemonsetBuilder) PodLabel(labelKey string, labelValue string) *DaemonsetBuilder { + d.labels[labelKey] = labelValue + return d +} + +func (d *DaemonsetBuilder) HostNetwork(hostNetwork bool) *DaemonsetBuilder { + d.hostNetwork = hostNetwork + return d +} + +func (d *DaemonsetBuilder) MountVolume(volume []corev1.Volume, volumeMount []corev1.VolumeMount) *DaemonsetBuilder { + d.volume = volume + d.volumeMount = volumeMount + return d +} + +func (d *DaemonsetBuilder) Build() *v1.DaemonSet { + deploymentSpec := &v1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: d.name, + Namespace: d.namespace, + Labels: d.labels, + }, + Spec: v1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: d.labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: d.labels, + }, + Spec: corev1.PodSpec{ + HostNetwork: d.hostNetwork, + NodeSelector: d.nodeSelector, + Containers: []corev1.Container{d.container}, + TerminationGracePeriodSeconds: aws.Int64(int64(d.terminationGracePeriod)), + }, + }, + }, + } + + if len(d.volume) > 0 && len(d.volumeMount) > 0 { + deploymentSpec.Spec.Template.Spec.Volumes = d.volume + deploymentSpec.Spec.Template.Spec.Containers[0].VolumeMounts = d.volumeMount + } + return deploymentSpec +} diff --git a/test/framework/resources/k8s/resources/daemonset.go b/test/framework/resources/k8s/resources/daemonset.go index aa5e8d5553..e6a9c2eca9 100644 --- a/test/framework/resources/k8s/resources/daemonset.go +++ b/test/framework/resources/k8s/resources/daemonset.go @@ -16,9 +16,11 @@ package resources import ( "context" "errors" + "time" "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" v1 "k8s.io/api/apps/v1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" @@ -26,8 +28,12 @@ import ( type DaemonSetManager interface { GetDaemonSet(namespace string, name string) (*v1.DaemonSet, error) + + CreateAndWaitTillDaemonSetIsReady(daemonSet *v1.DaemonSet, timeout time.Duration) (*v1.DaemonSet, error) + UpdateAndWaitTillDaemonSetReady(old *v1.DaemonSet, new *v1.DaemonSet) (*v1.DaemonSet, error) CheckIfDaemonSetIsReady(namespace string, name string) error + DeleteAndWaitTillDaemonSetIsDeleted(daemonSet *v1.DaemonSet, timeout time.Duration) error } type defaultDaemonSetManager struct { @@ -38,6 +44,24 @@ func NewDefaultDaemonSetManager(k8sClient client.Client) DaemonSetManager { return &defaultDaemonSetManager{k8sClient: k8sClient} } +func (d *defaultDaemonSetManager) CreateAndWaitTillDaemonSetIsReady(daemonSet *v1.DaemonSet, timeout time.Duration) (*v1.DaemonSet, error) { + ctx := context.Background() + err := d.k8sClient.Create(ctx, daemonSet) + if err != nil { + return nil, err + } + + // Allow for the cache to sync + time.Sleep(utils.PollIntervalShort) + + err = d.CheckIfDaemonSetIsReady(daemonSet.Namespace, daemonSet.Name) + if err != nil { + return nil, err + } + + return daemonSet, nil +} + func (d *defaultDaemonSetManager) GetDaemonSet(namespace string, name string) (*v1.DaemonSet, error) { ctx := context.Background() daemonSet := &v1.DaemonSet{} @@ -94,3 +118,28 @@ func (d *defaultDaemonSetManager) CheckIfDaemonSetIsReady(namespace string, name }, ctx.Done()) } + +func (d *defaultDaemonSetManager) DeleteAndWaitTillDaemonSetIsDeleted(daemonSet *v1.DaemonSet, timeout time.Duration) error { + ctx := context.Background() + + err := d.k8sClient.Delete(ctx, daemonSet) + + if k8sErrors.IsNotFound(err) { + return nil + } + + if err != nil { + return err + } + observed := &v1.DaemonSet{} + + return wait.PollImmediateUntil(utils.PollIntervalShort, func() (bool, error) { + if err := d.k8sClient.Get(ctx, utils.NamespacedName(daemonSet), observed); err != nil { + if k8sErrors.IsNotFound(err) { + return true, nil + } + return false, err + } + return false, nil + }, ctx.Done()) +} diff --git a/test/integration/cni/pod_traffic_across_az_test.go b/test/integration/cni/pod_traffic_across_az_test.go new file mode 100644 index 0000000000..753744a55f --- /dev/null +++ b/test/integration/cni/pod_traffic_across_az_test.go @@ -0,0 +1,271 @@ +package cni + +import ( + "fmt" + "strconv" + + "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/manifest" + "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" + "github.com/aws/amazon-vpc-cni-k8s/test/integration/common" + "github.com/aws/aws-sdk-go/service/ec2" + coreV1 "k8s.io/api/core/v1" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/apps/v1" +) + +var ( + retries = 3 +) + +// Tests pod networking across AZs. It similar to pod connectivity test, but launches a daemonset, so that +// there is a pod on each node across AZs. It then tests connectivity between pods on different nodes across AZs. +var _ = Describe("[STATIC_CANARY] test pod networking", FlakeAttempts(retries), func() { + + var ( + err error + serverPort int + protocol string + + // The command to run on server pods, to allow incoming + // connections for different traffic type + serverListenCmd []string + // Arguments to the server listen command + serverListenCmdArgs []string + + // The function that generates command which will be sent from + // tester pod to receiver pod + testConnectionCommandFunc func(serverPod coreV1.Pod, port int) []string + + // Expected stdout from the exec command on testing connection + // from tester to server + testerExpectedStdOut string + // Expected stderr from the exec command on testing connection + // from tester to server + testerExpectedStdErr string + + // Daemonset to run on node + testDaemonSet *v1.DaemonSet + + // Map of AZ name, string to pod of testDaemonSet + azToTestPod map[string]coreV1.Pod + ) + + JustBeforeEach(func() { + By("authorizing security group ingress on instance security group") + err = f.CloudServices.EC2(). + AuthorizeSecurityGroupIngress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + By("authorizing security group egress on instance security group") + err = f.CloudServices.EC2(). + AuthorizeSecurityGroupEgress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + netcatContainer := manifest. + NewNetCatAlpineContainer(f.Options.TestImageRegistry). + Command(serverListenCmd). + Args(serverListenCmdArgs). + Build() + + By("creating a server DaemonSet on primary node") + + testDaemonSet = manifest. + NewDefaultDaemonsetBuilder(). + Container(netcatContainer). + PodLabel("role", "az-test"). + Name("netcat-daemonset"). + Build() + + _, err = f.K8sResourceManagers.DaemonSetManager().CreateAndWaitTillDaemonSetIsReady(testDaemonSet, utils.DefaultDeploymentReadyTimeout) + Expect(err).ToNot(HaveOccurred()) + + By(fmt.Sprintf("getting the node with the node label key %s and value %s", + f.Options.NgNameLabelKey, f.Options.NgNameLabelVal)) + + nodes, err := f.K8sResourceManagers.NodeManager().GetNodes(f.Options.NgNameLabelKey, f.Options.NgNameLabelVal) + + Expect(err).ToNot(HaveOccurred()) + + azToTestPod = GetAZtoPod(nodes) + }) + + JustAfterEach(func() { + By("revoking security group ingress on instance security group") + err = f.CloudServices.EC2(). + RevokeSecurityGroupIngress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + By("revoking security group egress on instance security group") + err = f.CloudServices.EC2(). + RevokeSecurityGroupEgress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + By("deleting the Daemonset.") + err = f.K8sResourceManagers.DaemonSetManager().DeleteAndWaitTillDaemonSetIsDeleted(testDaemonSet, utils.DefaultDeploymentReadyTimeout) + Expect(err).ToNot(HaveOccurred()) + }) + + Context("While testing connectivity across AZ", func() { + + BeforeEach(func() { + serverPort = 2273 + protocol = ec2.ProtocolTcp + // Test tcp connection using netcat + serverListenCmd = []string{"nc"} + // The nc flag "-l" for listen mode, "-k" to keep server up and not close + // connection after each connection + serverListenCmdArgs = []string{"-k", "-l", strconv.Itoa(serverPort)} + // netcat verbose output is being redirected to stderr instead of stdout + testerExpectedStdErr = "succeeded!" + testerExpectedStdOut = "" + + // The nc flag "-v" for verbose output and "-wn" for timing out in n seconds + testConnectionCommandFunc = func(receiverPod coreV1.Pod, port int) []string { + return []string{"nc", "-v", "-w2", receiverPod.Status.PodIP, strconv.Itoa(port)} + } + }) + + It("Should allow TCP traffic across AZs.", func() { + CheckConnectivityBetweenPods(azToTestPod, serverPort, testerExpectedStdOut, testerExpectedStdErr, testConnectionCommandFunc) + }) + }) +}) + +func GetAZtoPod(nodes coreV1.NodeList) map[string]coreV1.Pod { + // Map of AZ name to Pod from Daemonset running on nodes + azToPod := make(map[string]coreV1.Pod) + for i := range nodes.Items { + // node label key "topology.kubernetes.io/zone" is well known label populated by cloud controller manager + // guaranteed to be present and represent the AZ name + // Ref https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone + azName := nodes.Items[i].ObjectMeta.Labels["topology.kubernetes.io/zone"] + interfaceToPodList := common.GetPodsOnPrimaryAndSecondaryInterface(nodes.Items[i], "role", "az-test", f) + // It doesn't matter which ENI the pod is on, as long as it is on the node + if len(interfaceToPodList.PodsOnSecondaryENI) > 0 { + azToPod[azName] = interfaceToPodList.PodsOnSecondaryENI[0] + } + if len(interfaceToPodList.PodsOnPrimaryENI) > 0 { + azToPod[azName] = interfaceToPodList.PodsOnPrimaryENI[0] + } + } + return azToPod +} + +var _ = Describe("[STATIC_CANARY2] API Server Connectivity from AZs", FlakeAttempts(retries), func() { + + var ( + err error + testDaemonSet *v1.DaemonSet + + // Map of AZ name to Pod of testDaemonSet running on nodes + azToPod map[string]coreV1.Pod + ) + + JustBeforeEach(func() { + serverContainer := manifest. + NewCurlContainer(). + Command([]string{ + "sleep", + "3600", + }). + Build() + + By("creating a server DaemonSet on primary node") + + testDaemonSet = manifest. + NewDefaultDaemonsetBuilder(). + Container(serverContainer). + PodLabel("role", "az-test"). + Name("api-server-connectivity-daemonset"). + Build() + + _, err = f.K8sResourceManagers.DaemonSetManager().CreateAndWaitTillDaemonSetIsReady(testDaemonSet, utils.DefaultDeploymentReadyTimeout) + Expect(err).ToNot(HaveOccurred()) + + By(fmt.Sprintf("getting the node with the node label key %s and value %s", + f.Options.NgNameLabelKey, f.Options.NgNameLabelVal)) + + nodes, err := f.K8sResourceManagers.NodeManager().GetNodes(f.Options.NgNameLabelKey, f.Options.NgNameLabelVal) + Expect(err).ToNot(HaveOccurred()) + + azToPod = GetAZtoPod(nodes) + }) + + JustAfterEach(func() { + By("Deleting the Daemonset.") + err = f.K8sResourceManagers.DaemonSetManager().DeleteAndWaitTillDaemonSetIsDeleted(testDaemonSet, utils.DefaultDeploymentReadyTimeout) + Expect(err).ToNot(HaveOccurred()) + + }) + + Context("While testing API Server Connectivity", func() { + + It("Should connect to the API Server", func() { + describeClusterOutput, err := f.CloudServices.EKS().DescribeCluster(f.Options.ClusterName) + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error while Describing the cluster to find APIServer NLB endpoint. %s", f.Options.ClusterName)) + APIServerNLBEndpoint := fmt.Sprintf("%s/api", *describeClusterOutput.Cluster.Endpoint) + APIServerInternalEndpoint := "https://kubernetes.default.svc/api" + + CheckAPIServerConnectivityFromPods(azToPod, APIServerInternalEndpoint) + + CheckAPIServerConnectivityFromPods(azToPod, APIServerNLBEndpoint) + }) + + }) +}) + +func CheckAPIServerConnectivityFromPods(azToPod map[string]coreV1.Pod, api_server_url string) { + // Standard paths for SA token, CA cert and API Server URL + token_path := "/var/run/secrets/kubernetes.io/serviceaccount/token" + cacert := "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + + for az := range azToPod { + fmt.Printf("Testing API Server %s Connectivity from AZ %s \n", api_server_url, az) + sa_token := []string{"cat", token_path} + token_value, _, err := RunCommandOnPod(azToPod[az], sa_token) + + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("failed to get SA token for pod in %s", az)) + + bearer := fmt.Sprintf("Authorization: Bearer %s", token_value) + + test_api_server_connectivity := []string{"curl", "--cacert", cacert, "--header", bearer, "-X", "GET", + api_server_url, + } + + api_server_stdout, _, err := RunCommandOnPod(azToPod[az], test_api_server_connectivity) + // Descriptive error message on failure to connect to API Server from particular AZ. + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error while connecting to API Server from %s", az)) + Expect(api_server_stdout).ToNot(BeEmpty()) + Expect(api_server_stdout).To(ContainSubstring("APIVersions")) + fmt.Printf("API Server %s Connectivity from AZ %s was successful.\n", api_server_url, az) + } +} + +func CheckConnectivityBetweenPods(azToPod map[string]coreV1.Pod, port int, testerExpectedStdOut string, testerExpectedStdErr string, getTestCommandFunc func(serverPod coreV1.Pod, port int) []string) { + + By("checking connection on same node, primary to primary") + + for az1 := range azToPod { + for az2 := range azToPod { + if az1 != az2 { + fmt.Printf("Testing Connectivity from Pod IP1 %s (%s) to Pod IP2 %s (%s) \n", + azToPod[az1].Status.PodIP, az1, azToPod[az2].Status.PodIP, az2) + testConnectivity(azToPod[az1], azToPod[az2], testerExpectedStdOut, testerExpectedStdErr, port, getTestCommandFunc) + } + } + } +} + +func RunCommandOnPod(receiverPod coreV1.Pod, command []string) (string, string, error) { + count := retries + for { + stdout, stdrr, err := f.K8sResourceManagers.PodManager(). + PodExec(receiverPod.Namespace, receiverPod.Name, command) + count -= 1 + if count == 0 || err == nil { + return stdout, stdrr, err + } + } +}