diff --git a/pkg/yurthub/filter/approver_test.go b/pkg/yurthub/filter/approver_test.go new file mode 100644 index 00000000000..6242460f650 --- /dev/null +++ b/pkg/yurthub/filter/approver_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2020 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "testing" +) + +func TestApprove(t *testing.T) { + testcases := map[string]struct { + comp string + resource string + verbs []string + comp2 string + resource2 string + verb2 string + expectedResult bool + }{ + "normal case": { + "kubelet", "services", []string{"list", "watch"}, + "kubelet", "services", "list", + true, + }, + "components are not equal": { + "kubelet", "services", []string{"list", "watch"}, + "kube-proxy", "services", "list", + false, + }, + "resources are not equal": { + "kubelet", "services", []string{"list", "watch"}, + "kubelet", "pods", "list", + false, + }, + "verb is not in verbs set": { + "kubelet", "services", []string{"list", "watch"}, + "kubelet", "services", "get", + false, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + approver := NewApprover(tt.comp, tt.resource, tt.verbs...) + result := approver.Approve(tt.comp2, tt.resource2, tt.verb2) + + if result != tt.expectedResult { + t.Errorf("Approve error: expected %v, but got %v\n", tt.expectedResult, result) + } + }) + } +} diff --git a/pkg/yurthub/filter/discardcloudservice/handler_test.go b/pkg/yurthub/filter/discardcloudservice/handler_test.go new file mode 100644 index 00000000000..7fe1f8aa2e9 --- /dev/null +++ b/pkg/yurthub/filter/discardcloudservice/handler_test.go @@ -0,0 +1,412 @@ +/* +Copyright 2020 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discardcloudservice + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" +) + +func TestObjectResponseFilter(t *testing.T) { + testcases := map[string]struct { + group string + version string + resources string + accept string + originalList runtime.Object + expectResult runtime.Object + }{ + "serviceList contains LoadBalancer service with SkipDiscardServiceAnnotation is not true": { + group: "", + version: "v1", + resources: "services", + accept: "application/json", + originalList: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "false", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + expectResult: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + "serviceList contains LoadBalancer service, but SkipDiscardServiceAnnotation is true": { + group: "", + version: "v1", + resources: "services", + accept: "application/json", + originalList: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + expectResult: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + "not serviceList": { + group: "", + version: "v1", + resources: "pods", + accept: "application/json", + originalList: &corev1.PodList{ + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }, + expectResult: &corev1.PodList{ + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + fh := &discardCloudServiceFilterHandler{ + serializer: serializer.NewSerializerManager(). + CreateSerializer(tt.accept, tt.group, tt.version, tt.resources), + } + + originalBytes, err := fh.serializer.Encode(tt.originalList) + if err != nil { + t.Errorf("encode originalList error: %v\n", err) + } + + filteredBytes, err := fh.ObjectResponseFilter(originalBytes) + if err != nil { + t.Errorf("ObjectResponseFilter got error: %v\n", err) + } + + expectedBytes, err := fh.serializer.Encode(tt.expectResult) + if err != nil { + t.Errorf("encode expectedResult error: %v\n", err) + } + + if !bytes.Equal(filteredBytes, expectedBytes) { + result, _ := fh.serializer.Decode(filteredBytes) + t.Errorf("ObjectResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult, result) + } + }) + } +} + +func TestStreamResponseFilter(t *testing.T) { + testcases := map[string]struct { + group string + version string + resources string + accept string + inputObj []watch.Event + expectResult []runtime.Object + }{ + "watch services that contain LoadBalancer service with SkipDiscardServiceAnnotation is not true": { + group: "", + version: "v1", + resources: "services", + accept: "application/json", + inputObj: []watch.Event{ + {Type: watch.Modified, Object: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "false", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }}, + {Type: watch.Modified, Object: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }}, + }, + expectResult: []runtime.Object{ + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + "watch services that contain LoadBalancer service, but SkipDiscardServiceAnnotation is true": { + group: "", + version: "v1", + resources: "services", + accept: "application/json", + inputObj: []watch.Event{ + {Type: watch.Modified, Object: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }}, + {Type: watch.Modified, Object: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }}, + }, + expectResult: []runtime.Object{ + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + "watch pods": { + group: "", + version: "v1", + resources: "services", + accept: "application/json", + inputObj: []watch.Event{ + {Type: watch.Modified, Object: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }}, + }, + expectResult: []runtime.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + fh := &discardCloudServiceFilterHandler{ + serializer: serializer.NewSerializerManager(). + CreateSerializer(tt.accept, tt.group, tt.version, tt.resources), + } + + r, w := io.Pipe() + go func(w *io.PipeWriter) { + for i := range tt.inputObj { + if _, err := fh.serializer.WatchEncode(w, &tt.inputObj[i]); err != nil { + t.Errorf("%d: encode watch unexpected error: %v", i, err) + continue + } + time.Sleep(100 * time.Millisecond) + } + w.Close() + }(w) + + rc := ioutil.NopCloser(r) + ch := make(chan watch.Event, len(tt.inputObj)) + + go func(rc io.ReadCloser, ch chan watch.Event) { + fh.StreamResponseFilter(rc, ch) + }(rc, ch) + + for i := 0; i < len(tt.expectResult); i++ { + event := <-ch + + resultBytes, _ := fh.serializer.Encode(event.Object) + expectedBytes, _ := fh.serializer.Encode(tt.expectResult[i]) + + if !bytes.Equal(resultBytes, expectedBytes) { + t.Errorf("StreamResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult[i], event.Object) + break + } + } + }) + } +} diff --git a/pkg/yurthub/filter/masterservice/handler_test.go b/pkg/yurthub/filter/masterservice/handler_test.go new file mode 100644 index 00000000000..92a0ca21092 --- /dev/null +++ b/pkg/yurthub/filter/masterservice/handler_test.go @@ -0,0 +1,465 @@ +/* +Copyright 2020 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package masterservice + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" +) + +func TestObjectResponseFilter(t *testing.T) { + fh := &masterServiceFilterHandler{ + host: "169.251.2.1", + port: 10268, + } + + testcases := map[string]struct { + group string + version string + resources string + userAgent string + accept string + verb string + path string + originalList runtime.Object + expectResult runtime.Object + }{ + "serviceList contains kubernetes service": { + group: "", + version: "v1", + resources: "services", + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/services", + originalList: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: MasterServiceName, + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.0.1", + Ports: []corev1.ServicePort{ + { + Port: 443, + Name: MasterServicePortName, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + expectResult: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: MasterServiceName, + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: fh.host, + Ports: []corev1.ServicePort{ + { + Port: fh.port, + Name: MasterServicePortName, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + }, + "serviceList does not contain kubernetes service": { + group: "", + version: "v1", + resources: "services", + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/services", + originalList: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + expectResult: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + }, + "not serviceList": { + group: "", + version: "v1", + resources: "pods", + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods", + originalList: &corev1.PodList{ + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }, + expectResult: &corev1.PodList{ + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + req, _ := http.NewRequest(tt.verb, tt.path, nil) + req.Header.Set("User-Agent", tt.userAgent) + req.Header.Set("Accept", tt.accept) + fh.req = req + fh.serializer = serializer.NewSerializerManager(). + CreateSerializer(tt.accept, tt.group, tt.version, tt.resources) + + originalBytes, err := fh.serializer.Encode(tt.originalList) + if err != nil { + t.Errorf("encode originalList error: %v\n", err) + } + + filteredBytes, err := fh.ObjectResponseFilter(originalBytes) + if err != nil { + t.Errorf("ObjectResponseFilter got error: %v\n", err) + } + + expectedBytes, err := fh.serializer.Encode(tt.expectResult) + if err != nil { + t.Errorf("encode expectedResult error: %v\n", err) + } + + if !bytes.Equal(filteredBytes, expectedBytes) { + result, _ := fh.serializer.Decode(filteredBytes) + t.Errorf("ObjectResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult, result) + } + }) + } +} + +func TestStreamResponseFilter(t *testing.T) { + fh := &masterServiceFilterHandler{ + host: "169.251.2.1", + port: 10268, + } + + testcases := map[string]struct { + group string + version string + resources string + userAgent string + accept string + verb string + path string + inputObj []watch.Event + expectResult []runtime.Object + }{ + "watch kubernetes service": { + group: "", + version: "v1", + resources: "services", + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/services?watch=true", + inputObj: []watch.Event{ + {Type: watch.Modified, Object: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: MasterServiceName, + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.0.1", + Ports: []corev1.ServicePort{ + { + Port: 443, + Name: MasterServicePortName, + }, + }, + }, + }}, + {Type: watch.Modified, Object: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }}, + }, + expectResult: []runtime.Object{ + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: MasterServiceName, + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: fh.host, + Ports: []corev1.ServicePort{ + { + Port: fh.port, + Name: MasterServicePortName, + }, + }, + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + "watch without kubernetes service": { + group: "", + version: "v1", + resources: "services", + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/services?watch=true", + inputObj: []watch.Event{ + {Type: watch.Modified, Object: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.188.105", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }}, + }, + expectResult: []runtime.Object{ + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.188.105", + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + "watch pods": { + group: "", + version: "v1", + resources: "pods", + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods?watch=true", + inputObj: []watch.Event{ + {Type: watch.Modified, Object: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }}, + }, + expectResult: []runtime.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + req, _ := http.NewRequest(tt.verb, tt.path, nil) + req.Header.Set("User-Agent", tt.userAgent) + req.Header.Set("Accept", tt.accept) + fh.req = req + + fh.serializer = serializer.NewSerializerManager(). + CreateSerializer(tt.accept, tt.group, tt.version, tt.resources) + + r, w := io.Pipe() + go func(w *io.PipeWriter) { + for i := range tt.inputObj { + if _, err := fh.serializer.WatchEncode(w, &tt.inputObj[i]); err != nil { + t.Errorf("%d: encode watch unexpected error: %v", i, err) + continue + } + time.Sleep(100 * time.Millisecond) + } + w.Close() + }(w) + + rc := ioutil.NopCloser(r) + ch := make(chan watch.Event, len(tt.inputObj)) + + go func(rc io.ReadCloser, ch chan watch.Event) { + fh.StreamResponseFilter(rc, ch) + }(rc, ch) + + for i := 0; i < len(tt.expectResult); i++ { + event := <-ch + + resultBytes, _ := fh.serializer.Encode(event.Object) + expectedBytes, _ := fh.serializer.Encode(tt.expectResult[i]) + + if !bytes.Equal(resultBytes, expectedBytes) { + t.Errorf("StreamResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult[i], event.Object) + break + } + } + }) + } +} diff --git a/pkg/yurthub/filter/servicetopology/handler_test.go b/pkg/yurthub/filter/servicetopology/handler_test.go new file mode 100644 index 00000000000..979c5ac6f98 --- /dev/null +++ b/pkg/yurthub/filter/servicetopology/handler_test.go @@ -0,0 +1,851 @@ +/* +Copyright 2020 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicetopology + +import ( + "context" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + + nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" + yurtfake "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned/fake" + yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" +) + +func TestReassembleEndpointSlice(t *testing.T) { + currentNodeName := "node1" + + testcases := map[string]struct { + endpointSlice *discovery.EndpointSlice + kubeClient *k8sfake.Clientset + yurtClient *yurtfake.Clientset + expectResult *discovery.EndpointSlice + }{ + "service with annotation openyurt.io/topologyKeys: kubernetes.io/hostname": { + endpointSlice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.3", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node2", + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + kubeClient: k8sfake.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: currentNodeName, + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + AnnotationServiceTopologyKey: AnnotationServiceTopologyValueNode, + }, + }, + }, + ), + yurtClient: yurtfake.NewSimpleClientset( + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hangzhou", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + currentNodeName, + "node3", + }, + }, + }, + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shanghai", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + "node2", + }, + }, + }, + ), + expectResult: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + }, + }, + }, + "service with annotation openyurt.io/topologyKeys: openyurt.io/nodepool": { + endpointSlice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.3", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node2", + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + kubeClient: k8sfake.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: currentNodeName, + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + AnnotationServiceTopologyKey: AnnotationServiceTopologyValueNodePool, + }, + }, + }, + ), + yurtClient: yurtfake.NewSimpleClientset( + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hangzhou", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + currentNodeName, + "node3", + }, + }, + }, + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shanghai", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + "node2", + }, + }, + }, + ), + expectResult: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + }, + "service with annotation openyurt.io/topologyKeys: kubernetes.io/zone": { + endpointSlice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.3", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node2", + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + kubeClient: k8sfake.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: currentNodeName, + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + AnnotationServiceTopologyKey: AnnotationServiceTopologyValueNodePool, + }, + }, + }, + ), + yurtClient: yurtfake.NewSimpleClientset( + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hangzhou", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + currentNodeName, + "node3", + }, + }, + }, + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shanghai", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + "node2", + }, + }, + }, + ), + expectResult: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + }, + "service without annotation openyurt.io/topologyKeys": { + endpointSlice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.3", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node2", + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + kubeClient: k8sfake.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: currentNodeName, + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{}, + }, + }, + ), + yurtClient: yurtfake.NewSimpleClientset( + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hangzhou", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + currentNodeName, + "node3", + }, + }, + }, + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shanghai", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + "node2", + }, + }, + }, + ), + expectResult: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.3", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node2", + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + }, + "currentNode is not in any nodepool": { + endpointSlice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.3", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node2", + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + kubeClient: k8sfake.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: currentNodeName, + Labels: map[string]string{}, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + }, + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + }, + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{}, + }, + }, + ), + yurtClient: yurtfake.NewSimpleClientset( + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hangzhou", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + "node3", + }, + }, + }, + &nodepoolv1alpha1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shanghai", + }, + Spec: nodepoolv1alpha1.NodePoolSpec{ + Type: nodepoolv1alpha1.Edge, + }, + Status: nodepoolv1alpha1.NodePoolStatus{ + Nodes: []string{ + "node2", + }, + }, + }, + ), + expectResult: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-np7sf", + Namespace: "default", + Labels: map[string]string{ + discovery.LabelServiceName: "svc1", + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + "10.244.1.2", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.3", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node2", + }, + }, + { + Addresses: []string{ + "10.244.1.4", + }, + Topology: map[string]string{ + corev1.LabelHostname: currentNodeName, + }, + }, + { + Addresses: []string{ + "10.244.1.5", + }, + Topology: map[string]string{ + corev1.LabelHostname: "node3", + }, + }, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + tt.kubeClient.DiscoveryV1beta1().EndpointSlices("default").Create(context.TODO(), tt.endpointSlice, metav1.CreateOptions{}) + + factory := informers.NewSharedInformerFactory(tt.kubeClient, 24*time.Hour) + serviceInformer := factory.Core().V1().Services() + serviceInformer.Informer() + serviceLister := serviceInformer.Lister() + + stopper := make(chan struct{}) + defer close(stopper) + factory.Start(stopper) + factory.WaitForCacheSync(stopper) + + yurtFactory := yurtinformers.NewSharedInformerFactory(tt.yurtClient, 24*time.Hour) + nodePoolInformer := yurtFactory.Apps().V1alpha1().NodePools() + nodePoolLister := nodePoolInformer.Lister() + + stopper2 := make(chan struct{}) + defer close(stopper2) + yurtFactory.Start(stopper2) + yurtFactory.WaitForCacheSync(stopper2) + + nodeGetter := func(name string) (*corev1.Node, error) { + return tt.kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) + } + + fh := &serviceTopologyFilterHandler{ + nodeName: currentNodeName, + serviceLister: serviceLister, + nodePoolLister: nodePoolLister, + nodeGetter: nodeGetter, + } + + reassembledEndpointSlice := fh.reassembleEndpointSlice(tt.endpointSlice) + + if !isEqualEndpointSlice(reassembledEndpointSlice, tt.expectResult) { + t.Errorf("reassembleEndpointSlice got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult, reassembledEndpointSlice) + } + + }) + } +} + +// isEqualEndpointSlice is used to determine whether two endpointSlice are equal. +// Note that this function can only be used in this test. +func isEqualEndpointSlice(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool { + if endpointSlice1.Name != endpointSlice2.Name || + endpointSlice1.Namespace != endpointSlice2.Namespace || + endpointSlice1.Labels[discovery.LabelServiceName] != endpointSlice2.Labels[discovery.LabelServiceName] { + return false + } + + endpoints1 := endpointSlice1.Endpoints + endpoints2 := endpointSlice2.Endpoints + if len(endpoints1) != len(endpoints2) { + return false + } + + for i := 0; i < len(endpoints1); i++ { + if !isEqualStrings(endpoints1[i].Addresses, endpoints2[i].Addresses) { + return false + } + + if endpoints1[i].Topology[corev1.LabelHostname] != endpoints2[i].Topology[corev1.LabelHostname] { + return false + } + } + + return true +} + +func isEqualStrings(s1, s2 []string) bool { + if len(s1) != len(s2) { + return false + } + + for i := 0; i < len(s1); i++ { + if s1[i] != s2[i] { + return false + } + } + + return true +}