diff --git a/test/framework/framework.go b/test/framework/framework.go index fab388acae..34f1d08b19 100644 --- a/test/framework/framework.go +++ b/test/framework/framework.go @@ -74,6 +74,6 @@ func New(options Options) *Framework { Options: options, K8sClient: k8sClient, CloudServices: aws.NewCloud(cloudConfig), - K8sResourceManagers: k8s.NewResourceManager(k8sClient), + K8sResourceManagers: k8s.NewResourceManager(k8sClient, k8sSchema, config), } } diff --git a/test/framework/resources/aws/services/ec2.go b/test/framework/resources/aws/services/ec2.go index 87052d5757..0f65afbfef 100644 --- a/test/framework/resources/aws/services/ec2.go +++ b/test/framework/resources/aws/services/ec2.go @@ -24,13 +24,18 @@ import ( type EC2 interface { DescribeInstanceType(instanceType string) ([]*ec2.InstanceTypeInfo, error) + DescribeInstance(instanceID string) (*ec2.Instance, error) + AuthorizeSecurityGroupIngress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error + RevokeSecurityGroupIngress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error + AuthorizeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error + RevokeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error } type defaultEC2 struct { ec2iface.EC2API } -func (d defaultEC2) DescribeInstanceType(instanceType string) ([]*ec2.InstanceTypeInfo, error) { +func (d *defaultEC2) DescribeInstanceType(instanceType string) ([]*ec2.InstanceTypeInfo, error) { describeInstanceTypeIp := &ec2.DescribeInstanceTypesInput{ InstanceTypes: aws.StringSlice([]string{instanceType}), } @@ -44,6 +49,98 @@ func (d defaultEC2) DescribeInstanceType(instanceType string) ([]*ec2.InstanceTy return describeInstanceOp.InstanceTypes, nil } +func (d *defaultEC2) DescribeInstance(instanceID string) (*ec2.Instance, error) { + describeInstanceInput := &ec2.DescribeInstancesInput{ + InstanceIds: aws.StringSlice([]string{instanceID}), + } + describeInstanceOutput, err := d.EC2API.DescribeInstances(describeInstanceInput) + if err != nil { + return nil, err + } + if describeInstanceOutput == nil || len(describeInstanceOutput.Reservations) == 0 || + len(describeInstanceOutput.Reservations[0].Instances) == 0 { + return nil, fmt.Errorf("failed to find instance %s", instanceID) + } + return describeInstanceOutput.Reservations[0].Instances[0], nil +} + +func (d *defaultEC2) AuthorizeSecurityGroupIngress(groupID string, protocol string, + fromPort int, toPort int, cidrIP string) error { + ipPermissions := &ec2.IpPermission{ + FromPort: aws.Int64(int64(fromPort)), + ToPort: aws.Int64(int64(toPort)), + IpProtocol: aws.String(protocol), + IpRanges: []*ec2.IpRange{ + { + CidrIp: aws.String(cidrIP), + }, + }, + } + authorizeSecurityGroupIngressInput := &ec2.AuthorizeSecurityGroupIngressInput{ + GroupId: aws.String(groupID), + IpPermissions: []*ec2.IpPermission{ipPermissions}, + } + _, err := d.EC2API.AuthorizeSecurityGroupIngress(authorizeSecurityGroupIngressInput) + return err +} + +func (d *defaultEC2) RevokeSecurityGroupIngress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error { + ipPermissions := &ec2.IpPermission{ + FromPort: aws.Int64(int64(fromPort)), + ToPort: aws.Int64(int64(toPort)), + IpProtocol: aws.String(protocol), + IpRanges: []*ec2.IpRange{ + { + CidrIp: aws.String(cidrIP), + }, + }, + } + revokeSecurityGroupIngressInput := &ec2.RevokeSecurityGroupIngressInput{ + GroupId: aws.String(groupID), + IpPermissions: []*ec2.IpPermission{ipPermissions}, + } + _, err := d.EC2API.RevokeSecurityGroupIngress(revokeSecurityGroupIngressInput) + return err +} + +func (d *defaultEC2) AuthorizeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error { + ipPermissions := &ec2.IpPermission{ + FromPort: aws.Int64(int64(fromPort)), + ToPort: aws.Int64(int64(toPort)), + IpProtocol: aws.String(protocol), + IpRanges: []*ec2.IpRange{ + { + CidrIp: aws.String(cidrIP), + }, + }, + } + authorizeSecurityGroupEgressInput := &ec2.AuthorizeSecurityGroupEgressInput{ + GroupId: aws.String(groupID), + IpPermissions: []*ec2.IpPermission{ipPermissions}, + } + _, err := d.EC2API.AuthorizeSecurityGroupEgress(authorizeSecurityGroupEgressInput) + return err +} + +func (d *defaultEC2) RevokeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error { + ipPermissions := &ec2.IpPermission{ + FromPort: aws.Int64(int64(fromPort)), + ToPort: aws.Int64(int64(toPort)), + IpProtocol: aws.String(protocol), + IpRanges: []*ec2.IpRange{ + { + CidrIp: aws.String(cidrIP), + }, + }, + } + revokeSecurityGroupEgressInput := &ec2.RevokeSecurityGroupEgressInput{ + GroupId: aws.String(groupID), + IpPermissions: []*ec2.IpPermission{ipPermissions}, + } + _, err := d.EC2API.RevokeSecurityGroupEgress(revokeSecurityGroupEgressInput) + return err +} + func NewEC2(session *session.Session) EC2 { return &defaultEC2{ EC2API: ec2.New(session), diff --git a/test/framework/resources/k8s/manager.go b/test/framework/resources/k8s/manager.go index b2e9a7e6a5..b6d749ee39 100644 --- a/test/framework/resources/k8s/manager.go +++ b/test/framework/resources/k8s/manager.go @@ -16,6 +16,8 @@ package k8s import ( "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/resources" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -26,6 +28,8 @@ type ResourceManagers interface { NamespaceManager() resources.NamespaceManager ServiceManager() resources.ServiceManager NodeManager() resources.NodeManager + PodManager() resources.PodManager + DaemonSetManager() resources.DaemonSetManager } type defaultManager struct { @@ -35,9 +39,12 @@ type defaultManager struct { namespaceManager resources.NamespaceManager serviceManager resources.ServiceManager nodeManager resources.NodeManager + podManager resources.PodManager + daemonSetManager resources.DaemonSetManager } -func NewResourceManager(k8sClient client.DelegatingClient) ResourceManagers { +func NewResourceManager(k8sClient client.DelegatingClient, + scheme *runtime.Scheme, config *rest.Config) ResourceManagers { return &defaultManager{ jobManager: resources.NewDefaultJobManager(k8sClient), deploymentManager: resources.NewDefaultDeploymentManager(k8sClient), @@ -45,6 +52,8 @@ func NewResourceManager(k8sClient client.DelegatingClient) ResourceManagers { namespaceManager: resources.NewDefaultNamespaceManager(k8sClient), serviceManager: resources.NewDefaultServiceManager(k8sClient), nodeManager: resources.NewDefaultNodeManager(k8sClient), + podManager: resources.NewDefaultPodManager(k8sClient, scheme, config), + daemonSetManager: resources.NewDefaultDaemonSetManager(k8sClient), } } @@ -71,3 +80,11 @@ func (m *defaultManager) ServiceManager() resources.ServiceManager { func (m *defaultManager) NodeManager() resources.NodeManager { return m.nodeManager } + +func (m *defaultManager) PodManager() resources.PodManager { + return m.podManager +} + +func (m *defaultManager) DaemonSetManager() resources.DaemonSetManager { + return m.daemonSetManager +} diff --git a/test/framework/resources/k8s/manifest/deployment.go b/test/framework/resources/k8s/manifest/deployment.go index cfaef729ac..a835f5f0dd 100644 --- a/test/framework/resources/k8s/manifest/deployment.go +++ b/test/framework/resources/k8s/manifest/deployment.go @@ -14,7 +14,7 @@ package manifest import ( - utils "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" + "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" "github.com/aws/aws-sdk-go/aws" v1 "k8s.io/api/apps/v1" @@ -38,11 +38,19 @@ func NewBusyBoxDeploymentBuilder() *DeploymentBuilder { name: "deployment-test", replicas: 10, container: NewBusyBoxContainerBuilder().Build(), - labels: map[string]string{"d": "d"}, + labels: map[string]string{"role": "test"}, terminationGracePeriod: 0, } } +func NewDefaultDeploymentBuilder() *DeploymentBuilder { + return &DeploymentBuilder{ + namespace: utils.DefaultTestNamespace, + terminationGracePeriod: 0, + labels: map[string]string{"role": "test"}, + } +} + func (d *DeploymentBuilder) Namespace(namespace string) *DeploymentBuilder { d.namespace = namespace return d diff --git a/test/framework/resources/k8s/resources/daemonset.go b/test/framework/resources/k8s/resources/daemonset.go new file mode 100644 index 0000000000..dd6994a990 --- /dev/null +++ b/test/framework/resources/k8s/resources/daemonset.go @@ -0,0 +1,69 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package resources + +import ( + "context" + + "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" + v1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type DaemonSetManager interface { + GetDaemonSet(namespace string, name string) (*v1.DaemonSet, error) + UpdateAndWaitTillDaemonSetReady(old *v1.DaemonSet, new *v1.DaemonSet) (*v1.DaemonSet, error) +} + +type defaultDaemonSetManager struct { + k8sClient client.DelegatingClient +} + +func NewDefaultDaemonSetManager(k8sClient client.DelegatingClient) DaemonSetManager { + return &defaultDaemonSetManager{k8sClient: k8sClient} +} + +func (d *defaultDaemonSetManager) GetDaemonSet(namespace string, name string) (*v1.DaemonSet, error) { + ctx := context.Background() + daemonSet := &v1.DaemonSet{} + err := d.k8sClient.Get(ctx, types.NamespacedName{ + Namespace: namespace, + Name: name, + }, daemonSet) + return daemonSet, err +} + +func (d *defaultDaemonSetManager) UpdateAndWaitTillDaemonSetReady(old *v1.DaemonSet, new *v1.DaemonSet) (*v1.DaemonSet, error) { + ctx := context.Background() + err := d.k8sClient.Patch(ctx, new, client.MergeFrom(old)) + if err != nil { + return nil, err + } + + observed := &v1.DaemonSet{} + return observed, wait.PollImmediateUntil(utils.PollIntervalShort, func() (bool, error) { + if err := d.k8sClient.Get(ctx, utils.NamespacedName(new), observed); err != nil { + return false, err + } + if observed.Status.NumberReady == (new.Status.DesiredNumberScheduled) && + observed.Status.NumberAvailable == (new.Status.DesiredNumberScheduled) && + observed.Status.UpdatedNumberScheduled == (new.Status.DesiredNumberScheduled) && + observed.Status.ObservedGeneration >= new.Generation { + return true, nil + } + return false, nil + }, ctx.Done()) +} diff --git a/test/framework/resources/k8s/resources/pod.go b/test/framework/resources/k8s/resources/pod.go new file mode 100644 index 0000000000..695cba603d --- /dev/null +++ b/test/framework/resources/k8s/resources/pod.go @@ -0,0 +1,105 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package resources + +import ( + "bytes" + "context" + "net/http" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +type PodManager interface { + PodExec(namespace string, name string, command []string) (string, string, error) + GetPodsWithLabelSelector(labelKey string, labelVal string) (v1.PodList, error) +} + +type defaultPodManager struct { + k8sClient client.DelegatingClient + k8sSchema *runtime.Scheme + config *rest.Config +} + +func NewDefaultPodManager(k8sClient client.DelegatingClient, k8sSchema *runtime.Scheme, + config *rest.Config) PodManager { + + return &defaultPodManager{ + k8sClient: k8sClient, + k8sSchema: k8sSchema, + config: config, + } +} + +func (d *defaultPodManager) PodExec(namespace string, name string, command []string) (string, string, error) { + pod := &v1.Pod{} + err := d.k8sClient.Get(context.Background(), types.NamespacedName{ + Namespace: namespace, + Name: name, + }, pod) + if err != nil { + return "", "", err + } + + gkv, err := apiutil.GVKForObject(pod, d.k8sSchema) + if err != nil { + return "", "", err + } + restClient, err := apiutil.RESTClientForGVK(gkv, d.config, serializer.NewCodecFactory(d.k8sSchema)) + if err != nil { + return "", "", err + } + + execOptions := &v1.PodExecOptions{ + Stdout: true, + Stderr: true, + Command: command, + } + + req := restClient.Post(). + Resource("pods"). + Name(pod.Name). + Namespace(pod.Namespace). + SubResource("exec"). + VersionedParams(execOptions, runtime.NewParameterCodec(d.k8sSchema)) + + exec, err := remotecommand.NewSPDYExecutor(d.config, http.MethodPost, req.URL()) + if err != nil { + return "", "", err + } + + var stdout, stderr bytes.Buffer + err = exec.Stream(remotecommand.StreamOptions{ + Stdout: &stdout, + Stderr: &stderr, + }) + + return stdout.String(), stderr.String(), err +} + +func (d *defaultPodManager) GetPodsWithLabelSelector(labelKey string, labelVal string) (v1.PodList, error) { + ctx := context.Background() + podList := v1.PodList{} + err := d.k8sClient.List(ctx, &podList, client.MatchingLabels{ + labelKey: labelVal, + }) + return podList, err +} diff --git a/test/framework/resources/k8s/utils/container.go b/test/framework/resources/k8s/utils/container.go new file mode 100644 index 0000000000..6d447d5aaa --- /dev/null +++ b/test/framework/resources/k8s/utils/container.go @@ -0,0 +1,83 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package utils + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" +) + +// AddOrUpdateEnvironmentVariable adds or updates existing Environment variable to the +// specified container name +func AddOrUpdateEnvironmentVariable(containers []v1.Container, containerName string, + envVars map[string]string) error { + + containerIndex := -1 + // Update existing environment variable first + for i, container := range containers { + if container.Name != containerName { + continue + } + containerIndex = i + for j, env := range container.Env { + if val, alreadyPresent := envVars[env.Name]; alreadyPresent { + container.Env[j].Value = val + // Delete, so we don't add the environment variable multiple times + delete(envVars, env.Name) + } + } + } + + if containerIndex < 0 { + return fmt.Errorf("failed to find container %s in the passed containers", + containerName) + } + + // Add the environment variables that were not already present + for key, val := range envVars { + containers[containerIndex].Env = append(containers[containerIndex].Env, + v1.EnvVar{ + Name: key, + Value: val, + }) + } + + return nil +} + +// RemoveEnvironmentVariables removes the environment variable from the specified container +func RemoveEnvironmentVariables(containers []v1.Container, containerName string, + envVars map[string]struct{}) error { + containerIndex := -1 + for i, container := range containers { + if container.Name != containerName { + continue + } + containerIndex = i + for j := 0; j < len(container.Env); j++ { + if _, ok := envVars[container.Env[j].Name]; ok { + container.Env = append(container.Env[:j], container.Env[j+1:]...) + j-- + } + } + } + + if containerIndex < 0 { + return fmt.Errorf("failed to find cotnainer %s in list of containers", + containerName) + } + + return nil +} diff --git a/test/framework/resources/k8s/utils/daemonset.go b/test/framework/resources/k8s/utils/daemonset.go new file mode 100644 index 0000000000..8e33dd06c0 --- /dev/null +++ b/test/framework/resources/k8s/utils/daemonset.go @@ -0,0 +1,70 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package utils + +import ( + "fmt" + + "github.com/aws/amazon-vpc-cni-k8s/test/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + v1 "k8s.io/api/apps/v1" +) + +func AddEnvVarToDaemonSetAndWaitTillUpdated(f *framework.Framework, dsName string, dsNamespace string, + containerName string, envVars map[string]string) { + + ds := getDaemonSet(f, dsName, dsNamespace) + updatedDs := ds.DeepCopy() + + By(fmt.Sprintf("setting the environment variables on the ds to %+v", envVars)) + err := AddOrUpdateEnvironmentVariable(updatedDs.Spec.Template.Spec.Containers, + containerName, envVars) + Expect(err).ToNot(HaveOccurred()) + + waitTillDaemonSetUpdated(f, ds, updatedDs) +} + +func RemoveVarFromDaemonSetAndWaitTillUpdated(f *framework.Framework, dsName string, dsNamespace string, + containerName string, envVars map[string]struct{}) { + + ds := getDaemonSet(f, dsName, dsNamespace) + updatedDs := ds.DeepCopy() + + By(fmt.Sprintf("setting the environment variables on the ds to %+v", envVars)) + err := RemoveEnvironmentVariables(updatedDs.Spec.Template.Spec.Containers, + containerName, envVars) + Expect(err).ToNot(HaveOccurred()) + + waitTillDaemonSetUpdated(f, ds, updatedDs) +} + +func getDaemonSet(f *framework.Framework, dsName string, dsNamespace string) *v1.DaemonSet { + By(fmt.Sprintf("getting the %s daemon set in namesapce %s", dsName, dsNamespace)) + ds, err := f.K8sResourceManagers. + DaemonSetManager(). + GetDaemonSet(dsNamespace, dsName) + Expect(err).ToNot(HaveOccurred()) + return ds +} + +func waitTillDaemonSetUpdated(f *framework.Framework, oldDs *v1.DaemonSet, updatedDs *v1.DaemonSet) *v1.DaemonSet { + By("updating the daemon set with new environment variable") + updatedDs, err := f.K8sResourceManagers. + DaemonSetManager(). + UpdateAndWaitTillDaemonSetReady(oldDs, updatedDs) + Expect(err).ToNot(HaveOccurred()) + return updatedDs +} diff --git a/test/framework/resources/k8s/utils/node.go b/test/framework/resources/k8s/utils/node.go new file mode 100644 index 0000000000..4f4659966a --- /dev/null +++ b/test/framework/resources/k8s/utils/node.go @@ -0,0 +1,25 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package utils + +import ( + "strings" + + v1 "k8s.io/api/core/v1" +) + +func GetInstanceIDFromNode(node v1.Node) string { + id := strings.Split(node.Spec.ProviderID, "/") + return id[len(id)-1] +} diff --git a/test/go.sum b/test/go.sum index b4a771938d..ee29f01cc4 100644 --- a/test/go.sum +++ b/test/go.sum @@ -121,6 +121,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 h1:cenwrSVm+Z7QLSV/BsnenAOcDXdX4cMv4wP0B/5QbPg= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= diff --git a/test/integration-new/README.md b/test/integration-new/README.md new file mode 100644 index 0000000000..0f1a06c260 --- /dev/null +++ b/test/integration-new/README.md @@ -0,0 +1,45 @@ +##CNI Integration Test Suites + +The package contains automated integration tests suites for `amazon-vpc-cni-k8s` . + +###Prerequisites +The integration test requires +- At least 2 nodes in a node group. +- Nodes in the nodegroup shouldn't have existing pods. +- Ginkgo installed on your environment. To install `go get github.com/onsi/ginkgo/ginkgo` + +####Testing +Set the environment variables that will be passed to Ginkgo script. If you want to directly pass the arguments you can skip to next step. +``` +CLUSTER_NAME= +VPC_ID= +KUBECONFIG= +AWS_REGION= +NG_NAME_LABEL_KEY= +# Example, NG_NAME_LABEL_KEY="eks.amazonaws.com/nodegroup" +NG_NAME_LABEL_VAL= +# Example, NG_NAME_LABEL_VAL="nodegroup-name" +``` + +To run the test switch to the integration folder. For instance running the cni integration test from root of the project. +```bash +cd test/integration-new/cni +``` +Run Ginkgo test suite +```bash +ginkgo -v --failOnPending -- \ + --cluster-kubeconfig=$KUBECONFIG \ + --cluster-name=$CLUSTER_NAME \ + --aws-region=$AWS_REGION \ + --aws-vpc-id=$VPC_ID \ + --ng-name-label-val=$NG_NAME_LABEL_KEY \ + --ng-name-label-val=$NG_NAME_LABEL_VAL +``` + +###Future Work +Currently the package is named as `integraiton-new` because we already have `integration` directory with existing Ginkgo test cases with a separate `go.mod`. Once the older package is completely deprecated we will rename this package to `integration`. + + + + + diff --git a/test/integration-new/cni/pod_networking_suite_test.go b/test/integration-new/cni/pod_networking_suite_test.go new file mode 100644 index 0000000000..03d89b79fe --- /dev/null +++ b/test/integration-new/cni/pod_networking_suite_test.go @@ -0,0 +1,91 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cni + +import ( + "fmt" + "testing" + + "github.com/aws/amazon-vpc-cni-k8s/test/framework" + k8sUtils "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/utils" + "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" +) + +const InstanceTypeNodeLabelKey = "beta.kubernetes.io/instance-type" + +var f *framework.Framework +var maxIPPerInterface int +var primaryNode v1.Node +var secondaryNode v1.Node +var instanceSecurityGroupID string + +func TestCNIPodNetworking(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "CNI Pod Networking Suite") +} + +var _ = BeforeSuite(func() { + f = framework.New(framework.GlobalOptions) + + By("creating test namespace") + f.K8sResourceManagers.NamespaceManager(). + CreateNamespace(utils.DefaultTestNamespace) + + By(fmt.Sprintf("getting the node with the node label key %s and value %s", + f.Options.NgNameLabelKey, f.Options.NgNameLabelVal)) + nodes, err := f.K8sResourceManagers.NodeManager().GetNodes(f.Options.NgNameLabelKey, f.Options.NgNameLabelVal) + Expect(err).ToNot(HaveOccurred()) + + By("verifying more than 1 nodes are present for the test") + Expect(len(nodes.Items)).Should(BeNumerically(">", 1)) + + // Set the primary and secondary node for testing + primaryNode = nodes.Items[0] + secondaryNode = nodes.Items[1] + + // Get the node security group + instanceID := k8sUtils.GetInstanceIDFromNode(primaryNode) + primaryInstance, err := f.CloudServices.EC2().DescribeInstance(instanceID) + Expect(err).ToNot(HaveOccurred()) + + // This won't work if the first SG is only associated with the primary instance. + // Need a robust substring in the SGP name to identify node SGP + instanceSecurityGroupID = *primaryInstance.NetworkInterfaces[0].Groups[0].GroupId + + By("getting the instance type from node label " + InstanceTypeNodeLabelKey) + instanceType := primaryNode.Labels[InstanceTypeNodeLabelKey] + + By("getting the network interface details from ec2") + instanceOutput, err := f.CloudServices.EC2().DescribeInstanceType(instanceType) + Expect(err).ToNot(HaveOccurred()) + + maxIPPerInterface = int(*instanceOutput[0].NetworkInfo.Ipv4AddressesPerInterface) + + // Set the WARM_ENI_TARGET to 0 to prevent all pods being scheduled on secondary ENI + k8sUtils.AddEnvVarToDaemonSetAndWaitTillUpdated(f, "aws-node", "kube-system", + "aws-node", map[string]string{"WARM_IP_TARGET": "3", "WARM_ENI_TARGET": "0"}) +}) + +var _ = AfterSuite(func() { + By("deleting test namespace") + f.K8sResourceManagers.NamespaceManager(). + DeleteAndWaitTillNamespaceDeleted(utils.DefaultTestNamespace) + + k8sUtils.RemoveVarFromDaemonSetAndWaitTillUpdated(f, "aws-node", "kube-system", + "aws-node", map[string]struct{}{"WARM_IP_TARGET": {}, "WARM_ENI_TARGET": {}}) +}) diff --git a/test/integration-new/cni/pod_networking_test.go b/test/integration-new/cni/pod_networking_test.go new file mode 100644 index 0000000000..0377b55d90 --- /dev/null +++ b/test/integration-new/cni/pod_networking_test.go @@ -0,0 +1,397 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cni + +import ( + "fmt" + "strconv" + + "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/manifest" + k8sUtils "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/utils" + + "github.com/aws/aws-sdk-go/service/ec2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + v1 "k8s.io/api/apps/v1" + coreV1 "k8s.io/api/core/v1" +) + +// Verifies network connectivity across Pods placed on different combination of +// primary and second Elastic Networking Interface on two nodes. The test verifies +// different traffic type for instance TCP, UDP, ICMP +var _ = Describe("test pod networking", func() { + + var ( + err error + // The command to run on server pods, to allow incoming + // connections for different traffic type + serverListenCmd []string + // Arguments to the server listen command + serverListenCmdArgs []string + // The function that generates command which will be sent from + // tester pod to receiver pod + testConnectionCommandFunc func(serverPod coreV1.Pod, port int) []string + // The functions reinforces that the positive test is working as + // expected by creating a negative test command that should fail + testFailedConnectionCommandFunc func(serverPod coreV1.Pod, port int) []string + // Expected stdout from the exec command on testing connection + // from tester to server + testerExpectedStdOut string + // Expected stderr from the exec command on testing connection + // from tester to server + testerExpectedStdErr string + // The port on which server is listening for new connections + serverPort int + // Protocol for establishing connection to server + protocol string + + // Primary node server deployment + primaryNodeDeployment *v1.Deployment + // Secondary node Server deployment + secondaryNodeDeployment *v1.Deployment + + // Map of Pods placed on primary/secondary ENI IP on primary node + interfaceToPodListOnPrimaryNode InterfaceTypeToPodList + // Map of Pods placed on primary/secondary ENI IP on secondary node + interfaceToPodListOnSecondaryNode InterfaceTypeToPodList + ) + + JustBeforeEach(func() { + By("authorizing security group ingress on instance security group") + err = f.CloudServices.EC2(). + AuthorizeSecurityGroupIngress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + By("authorizing security group egress on instance security group") + err = f.CloudServices.EC2(). + AuthorizeSecurityGroupEgress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + serverContainer := manifest. + NewNetCatAlpineContainer(). + Command(serverListenCmd). + Args(serverListenCmdArgs). + Build() + + By("creating server deployment on the primary node") + primaryNodeDeployment = manifest. + NewDefaultDeploymentBuilder(). + Container(serverContainer). + Replicas(maxIPPerInterface*2). // X2 so Pods are created on secondary ENI too + NodeName(primaryNode.Name). + PodLabel("node", "primary"). + Name("primary-node-server"). + Build() + + primaryNodeDeployment, err = f.K8sResourceManagers. + DeploymentManager(). + CreateAndWaitTillDeploymentIsReady(primaryNodeDeployment) + Expect(err).ToNot(HaveOccurred()) + + interfaceToPodListOnPrimaryNode = + GetPodsOnPrimaryAndSecondaryInterface(primaryNode, "node", "primary") + + // At least two Pods should be placed on the Primary and Secondary Interface + // on the Primary and Secondary Node in order to test all possible scenarios + Expect(len(interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI)). + Should(BeNumerically(">", 1)) + Expect(len(interfaceToPodListOnPrimaryNode.PodsOnSecondaryENI)). + Should(BeNumerically(">", 1)) + + By("creating server deployment on secondary node") + secondaryNodeDeployment = manifest. + NewDefaultDeploymentBuilder(). + Container(serverContainer). + Replicas(maxIPPerInterface*2). // X2 so Pods are created on secondary ENI too + NodeName(secondaryNode.Name). + PodLabel("node", "secondary"). + Name("secondary-node-server"). + Build() + + secondaryNodeDeployment, err = f.K8sResourceManagers. + DeploymentManager(). + CreateAndWaitTillDeploymentIsReady(secondaryNodeDeployment) + Expect(err).ToNot(HaveOccurred()) + + interfaceToPodListOnSecondaryNode = + GetPodsOnPrimaryAndSecondaryInterface(secondaryNode, "node", "secondary") + + // Same reason as mentioned above + Expect(len(interfaceToPodListOnSecondaryNode.PodsOnPrimaryENI)). + Should(BeNumerically(">", 1)) + Expect(len(interfaceToPodListOnSecondaryNode.PodsOnSecondaryENI)). + Should(BeNumerically(">", 1)) + }) + + JustAfterEach(func() { + By("revoking security group ingress on instance security group") + err = f.CloudServices.EC2(). + RevokeSecurityGroupIngress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + By("revoking security group egress on instance security group") + err = f.CloudServices.EC2(). + RevokeSecurityGroupEgress(instanceSecurityGroupID, protocol, serverPort, serverPort, "0.0.0.0/0") + Expect(err).ToNot(HaveOccurred()) + + By("deleting the primary node server deployment") + err = f.K8sResourceManagers.DeploymentManager(). + DeleteAndWaitTillDeploymentIsDeleted(primaryNodeDeployment) + Expect(err).ToNot(HaveOccurred()) + + By("deleting the secondary node server deployment") + err = f.K8sResourceManagers.DeploymentManager(). + DeleteAndWaitTillDeploymentIsDeleted(secondaryNodeDeployment) + Expect(err).ToNot(HaveOccurred()) + }) + + Context("when testing ICMP traffic", func() { + BeforeEach(func() { + // The number of packets to be sent + packetCount := 5 + // Protocol needs to be set allow ICMP traffic on the EC2 SG + protocol = "ICMP" + // ICMP doesn't need any port to be opened on the SG + serverPort = 0 + // Since ping doesn't need any server, just sleep on the server pod + serverListenCmd = []string{"sleep"} + serverListenCmdArgs = []string{"1000"} + + // Verify all the packets were transmitted and received successfully + testerExpectedStdOut = fmt.Sprintf("%d packets transmitted, "+ + "%d packets received", packetCount, packetCount) + testerExpectedStdErr = "" + + testConnectionCommandFunc = func(receiverPod coreV1.Pod, port int) []string { + return []string{"ping", "-c", strconv.Itoa(packetCount), receiverPod.Status.PodIP} + } + }) + + It("should allow ICMP traffic", func() { + CheckConnectivityForMultiplePodPlacement( + interfaceToPodListOnPrimaryNode, interfaceToPodListOnSecondaryNode, + serverPort, testerExpectedStdOut, testerExpectedStdErr, testConnectionCommandFunc) + }) + }) + + Context("when establishing UDP connection from tester to server", func() { + BeforeEach(func() { + serverPort = 2273 + protocol = ec2.ProtocolUdp + serverListenCmd = []string{"nc"} + // The nc flag "-l" for listen mode, "-k" to keep server up and not close + // connection after each connection, "-u" for udp + serverListenCmdArgs = []string{"-u", "-l", "-k", strconv.Itoa(serverPort)} + + // Verbose output from nc is being redirected to stderr instead of stdout + testerExpectedStdErr = "succeeded!" + testerExpectedStdOut = "" + + // The nc flag "-u" for UDP traffic, "-v" for verbose output and "-wn" for timing out + // in n seconds + testConnectionCommandFunc = func(receiverPod coreV1.Pod, port int) []string { + return []string{"nc", "-u", "-v", "-w2", receiverPod.Status.PodIP, strconv.Itoa(port)} + } + + // Create a negative test case with the wrong port number. This is to reinforce the + // positive test case work by verifying negative cases do throw error + testFailedConnectionCommandFunc = func(receiverPod coreV1.Pod, port int) []string { + return []string{"nc", "-u", "-v", "-w2", receiverPod.Status.PodIP, strconv.Itoa(port + 1)} + } + }) + + It("connection should be established", func() { + CheckConnectivityForMultiplePodPlacement( + interfaceToPodListOnPrimaryNode, interfaceToPodListOnSecondaryNode, + serverPort, testerExpectedStdOut, testerExpectedStdErr, testConnectionCommandFunc) + + By("verifying connection fails for unreachable port") + VerifyConnectivityFailsForNegativeCase(interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[0], + interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[1], serverPort, + testFailedConnectionCommandFunc) + }) + }) + + Context("when establishing TCP connection from tester to server", func() { + + BeforeEach(func() { + serverPort = 2273 + protocol = ec2.ProtocolTcp + // Test tcp connection using netcat + serverListenCmd = []string{"nc"} + // The nc flag "-l" for listen mode, "-k" to keep server up and not close + // connection after each connection + serverListenCmdArgs = []string{"-k", "-l", strconv.Itoa(serverPort)} + + // netcat verbose output is being redirected to stderr instead of stdout + testerExpectedStdErr = "succeeded!" + testerExpectedStdOut = "" + + // The nc flag "-v" for verbose output and "-wn" for timing out in n seconds + testConnectionCommandFunc = func(receiverPod coreV1.Pod, port int) []string { + return []string{"nc", "-v", "-w2", receiverPod.Status.PodIP, strconv.Itoa(port)} + } + + // Create a negative test case with the wrong port number. This is to reinforce the + // positive test case work by verifying negative cases do throw error + testFailedConnectionCommandFunc = func(receiverPod coreV1.Pod, port int) []string { + return []string{"nc", "-v", "-w2", receiverPod.Status.PodIP, strconv.Itoa(port + 1)} + } + }) + + It("should allow connection across nodes and across interface types", func() { + CheckConnectivityForMultiplePodPlacement( + interfaceToPodListOnPrimaryNode, interfaceToPodListOnSecondaryNode, + serverPort, testerExpectedStdOut, testerExpectedStdErr, testConnectionCommandFunc) + + By("verifying connection fails for unreachable port") + VerifyConnectivityFailsForNegativeCase(interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[0], + interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[1], serverPort, + testFailedConnectionCommandFunc) + }) + }) +}) + +func VerifyConnectivityFailsForNegativeCase(senderPod coreV1.Pod, receiverPod coreV1.Pod, port int, + getTestCommandFunc func(receiverPod coreV1.Pod, port int) []string) { + + testerCommand := getTestCommandFunc(receiverPod, port) + + fmt.Fprintf(GinkgoWriter, "verifying connectivity fails from pod %s on node %s with IP %s to pod"+ + " %s on node %s with IP %s\n", senderPod.Name, senderPod.Spec.NodeName, senderPod.Status.PodIP, + receiverPod.Name, receiverPod.Spec.NodeName, receiverPod.Status.PodIP) + + _, _, err := f.K8sResourceManagers.PodManager(). + PodExec(senderPod.Namespace, senderPod.Name, testerCommand) + Expect(err).To(HaveOccurred()) +} + +// CheckConnectivityForMultiplePodPlacement checks connectivity for various scenarios, an example +// connection from Pod on Node 1 having IP from Primary Network Interface to Pod on Node 2 having +// IP from Secondary Network Interface +func CheckConnectivityForMultiplePodPlacement(interfaceToPodListOnPrimaryNode InterfaceTypeToPodList, + interfaceToPodListOnSecondaryNode InterfaceTypeToPodList, port int, + testerExpectedStdOut string, testerExpectedStdErr string, + getTestCommandFunc func(receiverPod coreV1.Pod, port int) []string) { + + By("checking connection on same node, primary to primary") + testConnectivity( + interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[0], + interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[1], + testerExpectedStdOut, testerExpectedStdErr, port, getTestCommandFunc) + + By("checking connection on same node, primary to secondary") + testConnectivity( + interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[0], + interfaceToPodListOnPrimaryNode.PodsOnSecondaryENI[0], + testerExpectedStdOut, testerExpectedStdErr, port, getTestCommandFunc) + + By("checking connection on same node, secondary to secondary") + testConnectivity( + interfaceToPodListOnPrimaryNode.PodsOnSecondaryENI[0], + interfaceToPodListOnPrimaryNode.PodsOnSecondaryENI[1], + testerExpectedStdOut, testerExpectedStdErr, port, getTestCommandFunc) + + By("checking connection on different node, primary to primary") + testConnectivity( + interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[0], + interfaceToPodListOnSecondaryNode.PodsOnPrimaryENI[0], + testerExpectedStdOut, testerExpectedStdErr, port, getTestCommandFunc) + + By("checking connection on different node, primary to secondary") + testConnectivity( + interfaceToPodListOnPrimaryNode.PodsOnPrimaryENI[0], + interfaceToPodListOnSecondaryNode.PodsOnSecondaryENI[0], + testerExpectedStdOut, testerExpectedStdErr, port, getTestCommandFunc) + + By("checking connection on different node, secondary to secondary") + testConnectivity( + interfaceToPodListOnPrimaryNode.PodsOnSecondaryENI[0], + interfaceToPodListOnSecondaryNode.PodsOnSecondaryENI[0], + testerExpectedStdOut, testerExpectedStdErr, port, getTestCommandFunc) +} + +// testConnectivity verifies connectivity between tester and server +func testConnectivity(senderPod coreV1.Pod, receiverPod coreV1.Pod, expectedStdout string, + expectedStderr string, port int, getTestCommandFunc func(receiverPod coreV1.Pod, port int) []string) { + + testerCommand := getTestCommandFunc(receiverPod, port) + + fmt.Fprintf(GinkgoWriter, "verifying connectivity from pod %s on node %s with IP %s to pod"+ + " %s on node %s with IP %s\n", senderPod.Name, senderPod.Spec.NodeName, senderPod.Status.PodIP, + receiverPod.Name, receiverPod.Spec.NodeName, receiverPod.Status.PodIP) + + stdOut, stdErr, err := f.K8sResourceManagers.PodManager(). + PodExec(senderPod.Namespace, senderPod.Name, testerCommand) + Expect(err).ToNot(HaveOccurred()) + + fmt.Fprintf(GinkgoWriter, "stdout: %s and stderr: %s\n", stdOut, stdErr) + + Expect(stdErr).To(ContainSubstring(expectedStderr)) + Expect(stdOut).To(ContainSubstring(expectedStdout)) +} + +type InterfaceTypeToPodList struct { + PodsOnPrimaryENI []coreV1.Pod + PodsOnSecondaryENI []coreV1.Pod +} + +// GetPodsOnPrimaryAndSecondaryInterface returns the list of Pods on Primary Networking +// Interface and Secondary Network Interface on a given Node +func GetPodsOnPrimaryAndSecondaryInterface(node coreV1.Node, + podLabelKey string, podLabelVal string) InterfaceTypeToPodList { + podList, err := f.K8sResourceManagers. + PodManager(). + GetPodsWithLabelSelector(podLabelKey, podLabelVal) + Expect(err).ToNot(HaveOccurred()) + + instance, err := f.CloudServices.EC2(). + DescribeInstance(k8sUtils.GetInstanceIDFromNode(node)) + Expect(err).ToNot(HaveOccurred()) + + interfaceToPodList := InterfaceTypeToPodList{ + PodsOnPrimaryENI: []coreV1.Pod{}, + PodsOnSecondaryENI: []coreV1.Pod{}, + } + + ipToPod := map[string]coreV1.Pod{} + for _, pod := range podList.Items { + ipToPod[pod.Status.PodIP] = pod + } + + for _, nwInterface := range instance.NetworkInterfaces { + isPrimary := IsPrimaryENI(nwInterface, instance.PrivateIpAddress) + for _, ip := range nwInterface.PrivateIpAddresses { + if pod, found := ipToPod[*ip.PrivateIpAddress]; found { + if isPrimary { + interfaceToPodList.PodsOnPrimaryENI = + append(interfaceToPodList.PodsOnPrimaryENI, pod) + } else { + interfaceToPodList.PodsOnSecondaryENI = + append(interfaceToPodList.PodsOnSecondaryENI, pod) + } + } + } + } + return interfaceToPodList +} + +func IsPrimaryENI(nwInterface *ec2.InstanceNetworkInterface, instanceIPAddr *string) bool { + for _, privateIPAddress := range nwInterface.PrivateIpAddresses { + if *privateIPAddress.PrivateIpAddress == *instanceIPAddr { + return true + } + } + return false +}