From 775d69d5bf5ca198c5b2b5b6a84a0a07200854ee Mon Sep 17 00:00:00 2001 From: rambohe Date: Mon, 21 Nov 2022 12:41:16 +0800 Subject: [PATCH] 1. bugfix: StreamResponseFilter of data filter framework can't work if size of one object is over 32KB (#1066) 2. improve data filter framework - define ObjectHandler interface for filtering Runtime.Object, and other common routines are removed to FilterReadCloser - add SerializerManager as common parameter for all filters. --- .../filter/discardcloudservice/filter.go | 26 +- .../filter/discardcloudservice/handler.go | 114 ++--- .../discardcloudservice/handler_test.go | 384 ++++++-------- pkg/yurthub/filter/filter.go | 179 ++++--- pkg/yurthub/filter/filter_test.go | 471 ++++++++++++++++++ pkg/yurthub/filter/initializer/initializer.go | 15 - pkg/yurthub/filter/interfaces.go | 18 +- pkg/yurthub/filter/manager/manager.go | 15 +- pkg/yurthub/filter/masterservice/filter.go | 27 +- pkg/yurthub/filter/masterservice/handler.go | 120 ++--- .../filter/masterservice/handler_test.go | 374 +++----------- pkg/yurthub/filter/servicetopology/filter.go | 24 +- pkg/yurthub/filter/servicetopology/handler.go | 54 +- .../filter/servicetopology/handler_test.go | 250 +++------- pkg/yurthub/filter/util/utils.go | 38 -- 15 files changed, 984 insertions(+), 1125 deletions(-) create mode 100644 pkg/yurthub/filter/filter_test.go delete mode 100644 pkg/yurthub/filter/util/utils.go diff --git a/pkg/yurthub/filter/discardcloudservice/filter.go b/pkg/yurthub/filter/discardcloudservice/filter.go index 739ae222659..a9f448f5000 100644 --- a/pkg/yurthub/filter/discardcloudservice/filter.go +++ b/pkg/yurthub/filter/discardcloudservice/filter.go @@ -21,22 +21,22 @@ import ( "net/http" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" ) // Register registers a filter -func Register(filters *filter.Filters) { +func Register(filters *filter.Filters, sm *serializer.SerializerManager) { filters.Register(filter.DiscardCloudServiceFilterName, func() (filter.Runner, error) { - return NewFilter(), nil + return NewFilter(sm), nil }) } -func NewFilter() *discardCloudServiceFilter { - return &discardCloudServiceFilter{} +func NewFilter(sm *serializer.SerializerManager) *discardCloudServiceFilter { + return &discardCloudServiceFilter{ + serializerManager: sm, + } } type discardCloudServiceFilter struct { @@ -53,18 +53,6 @@ func (sf *discardCloudServiceFilter) SupportedResourceAndVerbs() map[string]sets } } -func (sf *discardCloudServiceFilter) SetSerializerManager(s *serializer.SerializerManager) error { - sf.serializerManager = s - return nil -} - func (sf *discardCloudServiceFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { - s := filterutil.CreateSerializer(req, sf.serializerManager) - if s == nil { - klog.Errorf("skip filter, failed to create serializer in discardCloudServiceFilter") - return 0, rc, nil - } - - handler := NewDiscardCloudServiceFilterHandler(s) - return filter.NewFilterReadCloser(req, rc, handler, s, filter.DiscardCloudServiceFilterName, stopCh) + return filter.NewFilterReadCloser(req, sf.serializerManager, rc, NewDiscardCloudServiceFilterHandler(), sf.Name(), stopCh) } diff --git a/pkg/yurthub/filter/discardcloudservice/handler.go b/pkg/yurthub/filter/discardcloudservice/handler.go index a831e878f37..aaeca9bf399 100644 --- a/pkg/yurthub/filter/discardcloudservice/handler.go +++ b/pkg/yurthub/filter/discardcloudservice/handler.go @@ -18,14 +18,12 @@ package discardcloudservice import ( "fmt" - "io" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" ) var ( @@ -34,91 +32,51 @@ var ( } ) -type discardCloudServiceFilterHandler struct { - serializer *serializer.Serializer -} +type discardCloudServiceFilterHandler struct{} -func NewDiscardCloudServiceFilterHandler(serializer *serializer.Serializer) filter.Handler { - return &discardCloudServiceFilterHandler{ - serializer: serializer, - } +func NewDiscardCloudServiceFilterHandler() filter.ObjectHandler { + return &discardCloudServiceFilterHandler{} } -// ObjectResponseFilter remove the cloud service(like LoadBalancer service) from response object -func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { - list, err := fh.serializer.Decode(b) - if err != nil || list == nil { - klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err) - return b, nil - } - - serviceList, ok := list.(*v1.ServiceList) - if ok { +// RuntimeObjectFilter remove the cloud service(like LoadBalancer service) from response object +func (fh *discardCloudServiceFilterHandler) RuntimeObjectFilter(obj runtime.Object) (runtime.Object, bool) { + switch v := obj.(type) { + case *v1.ServiceList: var svcNew []v1.Service - for i := range serviceList.Items { - nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name) - // remove lb service - if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer { - if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" { - klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName) - continue - } - } - - // remove cloud clusterIP service - if _, ok := cloudClusterIPService[nsName]; ok { - klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName) - continue + for i := range v.Items { + svc := discardCloudService(&v.Items[i]) + if svc != nil { + svcNew = append(svcNew, *svc) } - - svcNew = append(svcNew, serviceList.Items[i]) } - serviceList.Items = svcNew - return fh.serializer.Encode(serviceList) + v.Items = svcNew + return v, false + case *v1.Service: + svc := discardCloudService(v) + if svc == nil { + return svc, true + } + return svc, false + default: + return v, false } - - return b, nil } -// StreamResponseFilter filter the cloud service(like LoadBalancer service) from watch stream response -func (fh *discardCloudServiceFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { - defer func() { - close(ch) - }() - - d, err := fh.serializer.WatchDecoder(rc) - if err != nil { - klog.Errorf("StreamResponseFilter for discardCloudServiceFilterHandler ended with error, %v", err) - return err - } - - for { - watchType, obj, err := d.Decode() - if err != nil { - return err - } - - service, ok := obj.(*v1.Service) - if ok { - nsName := fmt.Sprintf("%s/%s", service.Namespace, service.Name) - // remove cloud LoadBalancer service - if service.Spec.Type == v1.ServiceTypeLoadBalancer { - if service.Annotations[filter.SkipDiscardServiceAnnotation] != "true" { - klog.V(2).Infof("load balancer service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName) - continue - } - } - - // remove cloud clusterIP service - if _, ok := cloudClusterIPService[nsName]; ok { - klog.V(2).Infof("clusterIP service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName) - continue - } +func discardCloudService(svc *v1.Service) *v1.Service { + nsName := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name) + // remove cloud LoadBalancer service + if svc.Spec.Type == v1.ServiceTypeLoadBalancer { + if svc.Annotations[filter.SkipDiscardServiceAnnotation] != "true" { + klog.V(2).Infof("load balancer service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName) + return nil } + } - var wEvent watch.Event - wEvent.Type = watchType - wEvent.Object = obj - ch <- wEvent + // remove cloud clusterIP service + if _, ok := cloudClusterIPService[nsName]; ok { + klog.V(2).Infof("clusterIP service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName) + return nil } + + return svc } diff --git a/pkg/yurthub/filter/discardcloudservice/handler_test.go b/pkg/yurthub/filter/discardcloudservice/handler_test.go index d33b6a947d8..8f5a9caffb5 100644 --- a/pkg/yurthub/filter/discardcloudservice/handler_test.go +++ b/pkg/yurthub/filter/discardcloudservice/handler_test.go @@ -17,35 +17,23 @@ limitations under the License. package discardcloudservice import ( - "bytes" - "io" + "reflect" "testing" - "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" ) -func TestObjectResponseFilter(t *testing.T) { +func TestRuntimeObjectFilter(t *testing.T) { testcases := map[string]struct { - group string - version string - resources string - accept string - originalList runtime.Object - expectResult runtime.Object + responseObj runtime.Object + expectObj runtime.Object }{ - "serviceList contains LoadBalancer service with SkipDiscardServiceAnnotation is not true": { - group: "", - version: "v1", - resources: "services", - accept: "application/json", - originalList: &corev1.ServiceList{ + "discard lb service for serviceList": { + responseObj: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -72,7 +60,7 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, - expectResult: &corev1.ServiceList{ + expectObj: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -87,12 +75,48 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, - "serviceList contains LoadBalancer service, but SkipDiscardServiceAnnotation is true": { - group: "", - version: "v1", - resources: "services", - accept: "application/json", - originalList: &corev1.ServiceList{ + "discard cloud clusterIP service for serviceList": { + responseObj: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + expectObj: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + "doesn't discard service for serviceList": { + responseObj: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -119,7 +143,7 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, - expectResult: &corev1.ServiceList{ + expectObj: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -147,220 +171,119 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, - "not serviceList": { - group: "", - version: "v1", - resources: "pods", - accept: "application/json", - originalList: &corev1.PodList{ - Items: []corev1.Pod{ + "discard all services for serviceList": { + responseObj: &corev1.ServiceList{ + Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", + Name: "svc1", Namespace: "default", }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx", - }, - }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, }, }, - }, - }, - expectResult: &corev1.PodList{ - Items: []corev1.Pod{ { ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "default", + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx", - }, - }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, }, }, }, }, + expectObj: &corev1.ServiceList{}, }, - } - - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - fh := &discardCloudServiceFilterHandler{ - serializer: serializer.NewSerializerManager(). - CreateSerializer(tt.accept, tt.group, tt.version, tt.resources), - } - - originalBytes, err := fh.serializer.Encode(tt.originalList) - if err != nil { - t.Errorf("encode originalList error: %v\n", err) - } - - filteredBytes, err := fh.ObjectResponseFilter(originalBytes) - if err != nil { - t.Errorf("ObjectResponseFilter got error: %v\n", err) - } - - expectedBytes, err := fh.serializer.Encode(tt.expectResult) - if err != nil { - t.Errorf("encode expectedResult error: %v\n", err) - } - - if !bytes.Equal(filteredBytes, expectedBytes) { - result, _ := fh.serializer.Decode(filteredBytes) - t.Errorf("ObjectResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult, result) - } - }) - } -} - -func TestStreamResponseFilter(t *testing.T) { - testcases := map[string]struct { - group string - version string - resources string - accept string - inputObj []watch.Event - expectResult []runtime.Object - }{ - "watch services that contain LoadBalancer service with SkipDiscardServiceAnnotation is not true": { - group: "", - version: "v1", - resources: "services", - accept: "application/json", - inputObj: []watch.Event{ - {Type: watch.Modified, Object: &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc1", - Namespace: "default", - Annotations: map[string]string{ - filter.SkipDiscardServiceAnnotation: "false", - }, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.187", - Type: corev1.ServiceTypeLoadBalancer, - }, - }}, - {Type: watch.Modified, Object: &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc2", - Namespace: "default", - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.188", - Type: corev1.ServiceTypeClusterIP, - }, - }}, + "discard lb service": { + responseObj: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, }, - expectResult: []runtime.Object{ - &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc2", - Namespace: "default", - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.188", - Type: corev1.ServiceTypeClusterIP, - }, + expectObj: nil, + }, + "discard cloud clusterIP service": { + responseObj: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeClusterIP, }, }, + expectObj: nil, }, - "watch services that contain LoadBalancer service, but SkipDiscardServiceAnnotation is true": { - group: "", - version: "v1", - resources: "services", - accept: "application/json", - inputObj: []watch.Event{ - {Type: watch.Modified, Object: &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc1", - Namespace: "default", - Annotations: map[string]string{ - filter.SkipDiscardServiceAnnotation: "true", - }, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.187", - Type: corev1.ServiceTypeLoadBalancer, + "skip lb service": { + responseObj: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "true", }, - }}, - {Type: watch.Modified, Object: &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc2", - Namespace: "default", - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.188", - Type: corev1.ServiceTypeClusterIP, - }, - }}, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, }, - expectResult: []runtime.Object{ - &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc1", - Namespace: "default", - Annotations: map[string]string{ - filter.SkipDiscardServiceAnnotation: "true", - }, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.187", - Type: corev1.ServiceTypeLoadBalancer, + expectObj: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + filter.SkipDiscardServiceAnnotation: "true", }, }, - &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc2", - Namespace: "default", - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.188", - Type: corev1.ServiceTypeClusterIP, - }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, }, }, }, - "watch pods": { - group: "", - version: "v1", - resources: "services", - accept: "application/json", - inputObj: []watch.Event{ - {Type: watch.Modified, Object: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "default", - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx", + "skip podList": { + responseObj: &corev1.PodList{ + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, }, }, }, - }}, + }, }, - expectResult: []runtime.Object{ - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "default", - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx", + expectObj: &corev1.PodList{ + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, }, }, }, @@ -371,40 +294,15 @@ func TestStreamResponseFilter(t *testing.T) { for k, tt := range testcases { t.Run(k, func(t *testing.T) { - fh := &discardCloudServiceFilterHandler{ - serializer: serializer.NewSerializerManager(). - CreateSerializer(tt.accept, tt.group, tt.version, tt.resources), - } - - r, w := io.Pipe() - go func(w *io.PipeWriter) { - for i := range tt.inputObj { - if _, err := fh.serializer.WatchEncode(w, &tt.inputObj[i]); err != nil { - t.Errorf("%d: encode watch unexpected error: %v", i, err) - continue - } - time.Sleep(100 * time.Millisecond) - } - w.Close() - }(w) - - rc := io.NopCloser(r) - ch := make(chan watch.Event, len(tt.inputObj)) - - go func(rc io.ReadCloser, ch chan watch.Event) { - fh.StreamResponseFilter(rc, ch) - }(rc, ch) - - for i := 0; i < len(tt.expectResult); i++ { - event := <-ch - - resultBytes, _ := fh.serializer.Encode(event.Object) - expectedBytes, _ := fh.serializer.Encode(tt.expectResult[i]) + fh := &discardCloudServiceFilterHandler{} - if !bytes.Equal(resultBytes, expectedBytes) { - t.Errorf("StreamResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult[i], event.Object) - break + newObj, isNil := fh.RuntimeObjectFilter(tt.responseObj) + if tt.expectObj == nil { + if !isNil { + t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj) } + } else if !reflect.DeepEqual(newObj, tt.expectObj) { + t.Errorf("RuntimeObjectFilter got error, expected: \n%v\nbut got: \n%v\n, isNil=%v", tt.expectObj, newObj, isNil) } }) } diff --git a/pkg/yurthub/filter/filter.go b/pkg/yurthub/filter/filter.go index 9dd7d5c45e7..a4d7a0c9f20 100644 --- a/pkg/yurthub/filter/filter.go +++ b/pkg/yurthub/filter/filter.go @@ -31,6 +31,7 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/util" ) type Factory func() (Runner, error) @@ -115,89 +116,149 @@ func (fis FilterInitializers) Initialize(ins Runner) error { } type filterReadCloser struct { - req *http.Request - rc io.ReadCloser - data *bytes.Buffer - ch chan watch.Event - handler Handler - isWatch bool - serializer *serializer.Serializer - ownerName string - stopCh <-chan struct{} + rc io.ReadCloser + filterCache *bytes.Buffer + watchDataCh chan *bytes.Buffer + serializer *serializer.Serializer + handler ObjectHandler + isWatch bool + ownerName string + stopCh <-chan struct{} } // NewFilterReadCloser create an filterReadCloser object func NewFilterReadCloser( req *http.Request, + sm *serializer.SerializerManager, rc io.ReadCloser, - handler Handler, - serializer *serializer.Serializer, + handler ObjectHandler, ownerName string, stopCh <-chan struct{}) (int, io.ReadCloser, error) { - ctx := req.Context() info, _ := apirequest.RequestInfoFrom(ctx) - dr := &filterReadCloser{ - req: req, - rc: rc, - ch: make(chan watch.Event), - data: new(bytes.Buffer), - handler: handler, - isWatch: info.Verb == "watch", - serializer: serializer, - ownerName: ownerName, - stopCh: stopCh, - } - - if dr.isWatch { - go func(req *http.Request, rc io.ReadCloser, ch chan watch.Event) { - err := handler.StreamResponseFilter(rc, ch) + respContentType, _ := util.RespContentTypeFrom(ctx) + s := CreateSerializer(respContentType, info, sm) + if s == nil { + klog.Errorf("skip filter, failed to create serializer in %s", ownerName) + return 0, rc, nil + } + + frc := &filterReadCloser{ + rc: rc, + watchDataCh: make(chan *bytes.Buffer), + filterCache: new(bytes.Buffer), + serializer: s, + handler: handler, + isWatch: info.Verb == "watch", + ownerName: ownerName, + stopCh: stopCh, + } + + if frc.isWatch { + go func(req *http.Request, rc io.ReadCloser, ch chan *bytes.Buffer) { + err := frc.StreamResponseFilter(rc, ch) if err != nil && err != io.EOF && !errors.Is(err, context.Canceled) { - klog.Errorf("filter(%s) watch response ended with error, %v", dr.ownerName, err) + klog.Errorf("filter(%s) watch response ended with error, %v", frc.ownerName, err) } - }(req, rc, dr.ch) - return 0, dr, nil + }(req, rc, frc.watchDataCh) + return 0, frc, nil } else { - var newData []byte - n, err := dr.data.ReadFrom(rc) - if err != nil { - return int(n), dr, err - } - - newData, err = handler.ObjectResponseFilter(dr.data.Bytes()) - dr.data = bytes.NewBuffer(newData) - return len(newData), dr, err + var err error + frc.filterCache, err = frc.ObjectResponseFilter(rc) + return frc.filterCache.Len(), frc, err } } -// Read read data into p and write into pipe -func (dr *filterReadCloser) Read(p []byte) (int, error) { - if dr.isWatch { +// Read get data into p and write into pipe +func (frc *filterReadCloser) Read(p []byte) (int, error) { + var ok bool + if frc.isWatch { + if frc.filterCache.Len() != 0 { + return frc.filterCache.Read(p) + } else { + frc.filterCache.Reset() + } + select { - case watchEvent, ok := <-dr.ch: + case frc.filterCache, ok = <-frc.watchDataCh: if !ok { return 0, io.EOF } + return frc.filterCache.Read(p) + } + } else { + return frc.filterCache.Read(p) + } +} - buf := &bytes.Buffer{} - n, err := dr.serializer.WatchEncode(buf, &watchEvent) - if err != nil { - klog.Errorf("filter(%s) failed to encode resource in Reader %v", dr.ownerName, err) - return 0, err - } - copied := copy(p, buf.Bytes()) - if copied != n { - return 0, fmt.Errorf("filter(%s) expect copy %d bytes, but only %d bytes copyied", dr.ownerName, n, copied) - } +// Close will close readers +func (frc *filterReadCloser) Close() error { + if frc.filterCache != nil { + frc.filterCache.Reset() + } + return frc.rc.Close() +} + +func (frc *filterReadCloser) ObjectResponseFilter(rc io.ReadCloser) (*bytes.Buffer, error) { + var buf bytes.Buffer + _, err := buf.ReadFrom(rc) + if err != nil { + return &buf, err + } + obj, err := frc.serializer.Decode(buf.Bytes()) + if err != nil || obj == nil { + klog.Errorf("skip filter, failed to decode response in HandleObjectResponse of %s %v", frc.ownerName, err) + return &buf, nil + } + + filteredObj, isNil := frc.handler.RuntimeObjectFilter(obj) + if isNil { + return &buf, nil + } + + newData, err := frc.serializer.Encode(filteredObj) + return bytes.NewBuffer(newData), err +} - return n, nil +func (frc *filterReadCloser) StreamResponseFilter(rc io.ReadCloser, ch chan *bytes.Buffer) error { + defer close(ch) + + d, err := frc.serializer.WatchDecoder(rc) + if err != nil { + klog.Errorf("failed to get watch decoder in StreamResponseFilter of %s, %v", frc.ownerName, err) + return err + } + + for { + watchType, obj, err := d.Decode() + if err != nil { + return err } - } else { - return dr.data.Read(p) + + newObj, isNil := frc.handler.RuntimeObjectFilter(obj) + if isNil { + continue + } + + wEvent := watch.Event{ + Type: watchType, + Object: newObj, + } + + buf := &bytes.Buffer{} + _, err = frc.serializer.WatchEncode(buf, &wEvent) + if err != nil { + klog.Errorf("failed to encode resource in StreamResponseFilter of %s, %v", frc.ownerName, err) + return err + } + ch <- buf } } -// Close close readers -func (dr *filterReadCloser) Close() error { - return dr.rc.Close() +func CreateSerializer(respContentType string, info *apirequest.RequestInfo, sm *serializer.SerializerManager) *serializer.Serializer { + if respContentType == "" || info == nil || info.APIVersion == "" || info.Resource == "" { + klog.Infof("CreateSerializer failed , info is :%+v", info) + return nil + } + return sm.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource) } diff --git a/pkg/yurthub/filter/filter_test.go b/pkg/yurthub/filter/filter_test.go new file mode 100644 index 00000000000..05de02a40e5 --- /dev/null +++ b/pkg/yurthub/filter/filter_test.go @@ -0,0 +1,471 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/filters" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" + hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +type nopRunner struct { + name string +} + +func (nr *nopRunner) Name() string { + return nr.name +} + +func (nr *nopRunner) SupportedResourceAndVerbs() map[string]sets.String { + return map[string]sets.String{} +} + +func (nr *nopRunner) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { + return 0, rc, nil +} + +func registerAllFilters(filters *Filters) { + filters.Register(ServiceTopologyFilterName, func() (Runner, error) { + return &nopRunner{name: ServiceTopologyFilterName}, nil + }) + filters.Register(DiscardCloudServiceFilterName, func() (Runner, error) { + return &nopRunner{name: DiscardCloudServiceFilterName}, nil + }) + filters.Register(MasterServiceFilterName, func() (Runner, error) { + return &nopRunner{name: MasterServiceFilterName}, nil + }) +} + +type nopInitializer struct{} + +func (nopInit *nopInitializer) Initialize(_ Runner) error { + return nil +} + +func TestNewFromFilters(t *testing.T) { + allFilters := []string{MasterServiceFilterName, DiscardCloudServiceFilterName, ServiceTopologyFilterName} + testcases := map[string]struct { + disabledFilters []string + generatedFilters sets.String + }{ + "disable master service filter": { + disabledFilters: []string{MasterServiceFilterName}, + generatedFilters: sets.NewString(allFilters...).Delete(MasterServiceFilterName), + }, + "disable service topology filter": { + disabledFilters: []string{ServiceTopologyFilterName}, + generatedFilters: sets.NewString(allFilters...).Delete(ServiceTopologyFilterName), + }, + "disable discard cloud service filter": { + disabledFilters: []string{DiscardCloudServiceFilterName}, + generatedFilters: sets.NewString(allFilters...).Delete(DiscardCloudServiceFilterName), + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + filters := NewFilters(tt.disabledFilters) + registerAllFilters(filters) + + runners, err := filters.NewFromFilters(&nopInitializer{}) + if err != nil { + t.Errorf("failed to new from filters, %v", err) + } + + gotRunners := sets.NewString() + for i := range runners { + gotRunners.Insert(runners[i].Name()) + } + + if !gotRunners.Equal(tt.generatedFilters) { + t.Errorf("expect filters %v, but got %v", tt.generatedFilters, gotRunners) + } + }) + } +} + +type nopObjectHandler struct{} + +func (noh *nopObjectHandler) RuntimeObjectFilter(obj runtime.Object) (runtime.Object, bool) { + return obj, false +} + +func TestFilterReadCloser_Read_List(t *testing.T) { + resolver := newTestRequestInfoResolver() + sm := serializer.NewSerializerManager() + handler := &nopObjectHandler{} + stopCh := make(chan struct{}) + + testcases := map[string]struct { + path string + listObj runtime.Object + stepSize int + expectObj runtime.Object + }{ + "read list response in one time": { + path: "/api/v1/services", + listObj: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + stepSize: 32 * 1024, + expectObj: &corev1.ServiceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "ServiceList", + APIVersion: "v1", + }, + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + "read list response in multiple times": { + path: "/api/v1/services", + listObj: &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + stepSize: 8, + expectObj: &corev1.ServiceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "ServiceList", + APIVersion: "v1", + }, + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.188", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + req, err := http.NewRequest("GET", tt.path, nil) + if err != nil { + t.Errorf("failed to create request, %v", err) + } + + req.RemoteAddr = "127.0.0.1" + req.Header.Set("Accept", "application/json") + + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + reqContentType, _ := hubutil.ReqContentTypeFrom(ctx) + ctx = hubutil.WithRespContentType(ctx, reqContentType) + req = req.WithContext(ctx) + info, _ := apirequest.RequestInfoFrom(ctx) + s := CreateSerializer(reqContentType, info, sm) + + listBytes, _ := s.Encode(tt.listObj) + buf := bytes.NewBuffer(listBytes) + rc := io.NopCloser(buf) + + size, newRc, err := NewFilterReadCloser(req, sm, rc, handler, "foo", stopCh) + if err != nil { + t.Errorf("failed new filter readcloser, %v", err) + } + + var resBuf bytes.Buffer + for { + b := make([]byte, tt.stepSize) + n, err := newRc.Read(b) + if err != nil && err != io.EOF { + t.Errorf("failed to read response %v", err) + } else if err == io.EOF { + break + } + + resBuf.Write(b[:n]) + } + + if size != 0 && size != resBuf.Len() { + t.Errorf("expect %d bytes, but got %d bytes", size, resBuf.Len()) + } + + readObj, _ := s.Decode(resBuf.Bytes()) + if !reflect.DeepEqual(tt.expectObj, readObj) { + t.Errorf("expect object \n%#+v\n, but got \n%#+v\n", tt.expectObj, readObj) + } + newRc.Close() + }) + + handler = util.WithRequestContentType(handler) + handler = filters.WithRequestInfo(handler, resolver) + handler.ServeHTTP(httptest.NewRecorder(), req) + }) + } +} + +func TestFilterReadCloser_Read_Watch(t *testing.T) { + resolver := newTestRequestInfoResolver() + sm := serializer.NewSerializerManager() + handler := &nopObjectHandler{} + stopCh := make(chan struct{}) + + testcases := map[string]struct { + path string + eventType watch.EventType + watchObject runtime.Object + stepSize int + expectObj runtime.Object + }{ + "read watch response in one time": { + path: "/api/v1/services?watch=true", + eventType: watch.Added, + watchObject: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + stepSize: 32 * 1024, + expectObj: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + }, + "read watch response in multiple times": { + path: "/api/v1/services?watch=true", + eventType: watch.Added, + watchObject: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + stepSize: 32, + expectObj: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + Annotations: map[string]string{ + SkipDiscardServiceAnnotation: "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + req, err := http.NewRequest("GET", tt.path, nil) + if err != nil { + t.Errorf("failed to create request, %v", err) + } + + req.RemoteAddr = "127.0.0.1" + req.Header.Set("Accept", "application/json") + + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + reqContentType, _ := hubutil.ReqContentTypeFrom(ctx) + ctx = hubutil.WithRespContentType(ctx, reqContentType) + req = req.WithContext(ctx) + info, _ := apirequest.RequestInfoFrom(ctx) + s := CreateSerializer(reqContentType, info, sm) + + var buf bytes.Buffer + event := &watch.Event{ + Type: tt.eventType, + Object: tt.watchObject, + } + initSize, err := s.WatchEncode(&buf, event) + if err != nil { + t.Errorf("failed to prepare watch data, %v", err) + } + rc := io.NopCloser(&buf) + + _, newRc, err := NewFilterReadCloser(req, sm, rc, handler, "foo", stopCh) + if err != nil { + t.Errorf("failed new filter readcloser, %v", err) + } + + var resBuf bytes.Buffer + for { + b := make([]byte, tt.stepSize) + n, err := newRc.Read(b) + if err != nil && err != io.EOF { + t.Errorf("failed to read response %v", err) + } else if err == io.EOF { + break + } + + resBuf.Write(b[:n]) + } + + if initSize != resBuf.Len() { + t.Errorf("expect %d bytes, but got %d bytes", initSize, resBuf.Len()) + } + + resDecoder, _ := s.WatchDecoder(io.NopCloser(&resBuf)) + eType, resObj, _ := resDecoder.Decode() + if eType != tt.eventType { + t.Errorf("expect event type %s, but got %s", tt.eventType, eType) + } + + if !reflect.DeepEqual(tt.expectObj, resObj) { + t.Errorf("expect object \n%#+v\n, but got \n%#+v\n", tt.expectObj, resObj) + } + newRc.Close() + }) + + handler = util.WithRequestContentType(handler) + handler = filters.WithRequestInfo(handler, resolver) + handler.ServeHTTP(httptest.NewRecorder(), req) + }) + } +} diff --git a/pkg/yurthub/filter/initializer/initializer.go b/pkg/yurthub/filter/initializer/initializer.go index ecd2c2519c5..d550f1934f6 100644 --- a/pkg/yurthub/filter/initializer/initializer.go +++ b/pkg/yurthub/filter/initializer/initializer.go @@ -21,7 +21,6 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/util" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" ) @@ -41,11 +40,6 @@ type WantsNodeName interface { SetNodeName(nodeName string) error } -// WantsSerializerManager is an interface for setting serializer manager -type WantsSerializerManager interface { - SetSerializerManager(s *serializer.SerializerManager) error -} - // WantsStorageWrapper is an interface for setting StorageWrapper type WantsStorageWrapper interface { SetStorageWrapper(s cachemanager.StorageWrapper) error @@ -66,7 +60,6 @@ type WantsWorkingMode interface { type genericFilterInitializer struct { factory informers.SharedInformerFactory yurtFactory yurtinformers.SharedInformerFactory - serializerManager *serializer.SerializerManager storageWrapper cachemanager.StorageWrapper nodeName string masterServiceHost string @@ -77,14 +70,12 @@ type genericFilterInitializer struct { // New creates an filterInitializer object func New(factory informers.SharedInformerFactory, yurtFactory yurtinformers.SharedInformerFactory, - sm *serializer.SerializerManager, sw cachemanager.StorageWrapper, nodeName, masterServiceHost, masterServicePort string, workingMode util.WorkingMode) *genericFilterInitializer { return &genericFilterInitializer{ factory: factory, yurtFactory: yurtFactory, - serializerManager: sm, storageWrapper: sw, nodeName: nodeName, masterServiceHost: masterServiceHost, @@ -129,12 +120,6 @@ func (fi *genericFilterInitializer) Initialize(ins filter.Runner) error { } } - if wants, ok := ins.(WantsSerializerManager); ok { - if err := wants.SetSerializerManager(fi.serializerManager); err != nil { - return err - } - } - if wants, ok := ins.(WantsStorageWrapper); ok { if err := wants.SetStorageWrapper(fi.storageWrapper); err != nil { return err diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index 76664c51f14..7dde7720dc3 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -21,8 +21,8 @@ import ( "net/http" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" ) type FilterInitializer interface { @@ -45,16 +45,12 @@ type Runner interface { Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) } -// Handler customizes data filtering processing interface for each handler. -// In the data filtering framework, data is mainly divided into two types: -// Object data: data returned by list/get request. -// Streaming data: The data returned by the watch request will be continuously pushed to the edge by the cloud. -type Handler interface { - // StreamResponseFilter is used to filter processing of streaming data. - StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error - - // ObjectResponseFilter is used to filter processing of object data. - ObjectResponseFilter(b []byte) ([]byte, error) +// ObjectHandler is used for filtering runtime object. +// runtime object includes List object(like ServiceList) that has multiple items and +// Standalone object(like Service). +// the second return value(bool): is used to specify the returned object is nil or not. +type ObjectHandler interface { + RuntimeObjectFilter(obj runtime.Object) (runtime.Object, bool) } type NodeGetter func(name string) (*v1.Node, error) diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index bd2f6359b01..823071b256b 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -54,7 +54,7 @@ func NewFilterManager(options *options.YurtHubOptions, options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...) } filters := filter.NewFilters(options.DisabledResourceFilters) - registerAllFilters(filters) + registerAllFilters(filters, serializerManager) mutatedMasterServiceHost, mutatedMasterServicePort, _ := net.SplitHostPort(apiserverAddr) if options.AccessServerThroughHub { @@ -66,7 +66,7 @@ func NewFilterManager(options *options.YurtHubOptions, } } - runners, err := createFilterRunners(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, util.WorkingMode(options.WorkingMode), options.NodeName, mutatedMasterServiceHost, mutatedMasterServicePort) + runners, err := createFilterRunners(filters, sharedFactory, yurtSharedFactory, storageWrapper, util.WorkingMode(options.WorkingMode), options.NodeName, mutatedMasterServiceHost, mutatedMasterServicePort) if err != nil { return nil, err } @@ -100,7 +100,6 @@ func (m *Manager) FindRunner(req *http.Request) (bool, filter.Runner) { func createFilterRunners(filters *filter.Filters, sharedFactory informers.SharedInformerFactory, yurtSharedFactory yurtinformers.SharedInformerFactory, - serializerManager *serializer.SerializerManager, storageWrapper cachemanager.StorageWrapper, workingMode util.WorkingMode, nodeName, mutatedMasterServiceHost, mutatedMasterServicePort string) ([]filter.Runner, error) { @@ -108,7 +107,7 @@ func createFilterRunners(filters *filter.Filters, return nil, nil } - genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, nodeName, mutatedMasterServiceHost, mutatedMasterServicePort, workingMode) + genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, storageWrapper, nodeName, mutatedMasterServiceHost, mutatedMasterServicePort, workingMode) initializerChain := filter.FilterInitializers{} initializerChain = append(initializerChain, genericInitializer) return filters.NewFromFilters(initializerChain) @@ -116,8 +115,8 @@ func createFilterRunners(filters *filter.Filters, // registerAllFilters by order, the front registered filter will be // called before the latter registered ones. -func registerAllFilters(filters *filter.Filters) { - servicetopology.Register(filters) - masterservice.Register(filters) - discardcloudservice.Register(filters) +func registerAllFilters(filters *filter.Filters, sm *serializer.SerializerManager) { + servicetopology.Register(filters, sm) + masterservice.Register(filters, sm) + discardcloudservice.Register(filters, sm) } diff --git a/pkg/yurthub/filter/masterservice/filter.go b/pkg/yurthub/filter/masterservice/filter.go index 9f704b6df50..7ad56fb6888 100644 --- a/pkg/yurthub/filter/masterservice/filter.go +++ b/pkg/yurthub/filter/masterservice/filter.go @@ -22,22 +22,22 @@ import ( "strconv" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" ) // Register registers a filter -func Register(filters *filter.Filters) { +func Register(filters *filter.Filters, sm *serializer.SerializerManager) { filters.Register(filter.MasterServiceFilterName, func() (filter.Runner, error) { - return NewFilter(), nil + return NewFilter(sm), nil }) } -func NewFilter() *masterServiceFilter { - return &masterServiceFilter{} +func NewFilter(sm *serializer.SerializerManager) *masterServiceFilter { + return &masterServiceFilter{ + serializerManager: sm, + } } type masterServiceFilter struct { @@ -56,11 +56,6 @@ func (msf *masterServiceFilter) SupportedResourceAndVerbs() map[string]sets.Stri } } -func (msf *masterServiceFilter) SetSerializerManager(s *serializer.SerializerManager) error { - msf.serializerManager = s - return nil -} - func (msf *masterServiceFilter) SetMasterServiceHost(host string) error { msf.host = host return nil @@ -77,12 +72,6 @@ func (msf *masterServiceFilter) SetMasterServicePort(portStr string) error { } func (msf *masterServiceFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { - s := filterutil.CreateSerializer(req, msf.serializerManager) - if s == nil { - klog.Errorf("skip filter, failed to create serializer in masterServiceFilter") - return 0, rc, nil - } - - handler := NewMasterServiceFilterHandler(req, s, msf.host, msf.port) - return filter.NewFilterReadCloser(req, rc, handler, s, filter.MasterServiceFilterName, stopCh) + handler := NewMasterServiceFilterHandler(msf.host, msf.port) + return filter.NewFilterReadCloser(req, msf.serializerManager, rc, handler, msf.Name(), stopCh) } diff --git a/pkg/yurthub/filter/masterservice/handler.go b/pkg/yurthub/filter/masterservice/handler.go index 6468038115e..c304107ab12 100644 --- a/pkg/yurthub/filter/masterservice/handler.go +++ b/pkg/yurthub/filter/masterservice/handler.go @@ -17,17 +17,11 @@ limitations under the License. package masterservice import ( - "io" - "net/http" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" - "github.com/openyurtio/openyurt/pkg/yurthub/util" ) const ( @@ -37,99 +31,49 @@ const ( ) type masterServiceFilterHandler struct { - req *http.Request - serializer *serializer.Serializer - host string - port int32 + host string + port int32 } -func NewMasterServiceFilterHandler( - req *http.Request, - serializer *serializer.Serializer, - host string, - port int32) filter.Handler { +func NewMasterServiceFilterHandler(host string, port int32) filter.ObjectHandler { return &masterServiceFilterHandler{ - req: req, - serializer: serializer, - host: host, - port: port, + host: host, + port: port, } } -// ObjectResponseFilter mutate master service(default/kubernetes) in the ServiceList object -func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { - list, err := fh.serializer.Decode(b) - if err != nil || list == nil { - klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err) - return b, nil - } - - // return data un-mutated if not ServiceList - serviceList, ok := list.(*v1.ServiceList) - if !ok { - return b, nil - } - - // mutate master service - for i := range serviceList.Items { - if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName { - serviceList.Items[i].Spec.ClusterIP = fh.host - for j := range serviceList.Items[i].Spec.Ports { - if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName { - serviceList.Items[i].Spec.Ports[j].Port = fh.port - break - } +// RuntimeObjectFilter mutate master service(default/kubernetes) in the response object +func (fh *masterServiceFilterHandler) RuntimeObjectFilter(obj runtime.Object) (runtime.Object, bool) { + switch v := obj.(type) { + case *v1.ServiceList: + for i := range v.Items { + newSvc, mutated := fh.mutateMasterService(&v.Items[i]) + if mutated { + v.Items[i] = *newSvc + break } - klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req)) - break } + return v, false + case *v1.Service: + svc, _ := fh.mutateMasterService(v) + return svc, false + default: + return v, false } - - // return the mutated serviceList - return fh.serializer.Encode(serviceList) } -//StreamResponseFilter mutate master service(default/kubernetes) in Watch Stream -func (fh *masterServiceFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { - defer func() { - close(ch) - }() - - d, err := fh.serializer.WatchDecoder(rc) - if err != nil { - klog.Errorf("StreamResponseFilter for master service ended with error, %v", err) - return err - } - - for { - watchType, obj, err := d.Decode() - if err != nil { - return err - } - - var wEvent watch.Event - wEvent.Type = watchType - // return data un-mutated if not Service - service, ok := obj.(*v1.Service) - if ok && service.Namespace == MasterServiceNamespace && service.Name == MasterServiceName { - service.Spec.ClusterIP = fh.host - for j := range service.Spec.Ports { - if service.Spec.Ports[j].Name == MasterServicePortName { - service.Spec.Ports[j].Port = fh.port - break - } +func (fh *masterServiceFilterHandler) mutateMasterService(svc *v1.Service) (*v1.Service, bool) { + mutated := false + if svc.Namespace == MasterServiceNamespace && svc.Name == MasterServiceName { + svc.Spec.ClusterIP = fh.host + for j := range svc.Spec.Ports { + if svc.Spec.Ports[j].Name == MasterServicePortName { + svc.Spec.Ports[j].Port = fh.port + break } - klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req)) - wEvent.Object = service - } else { - accessor := meta.NewAccessor() - ns, _ := accessor.Namespace(obj) - name, _ := accessor.Name(obj) - kind, _ := accessor.Kind(obj) - klog.V(2).Infof("skip filter, not master service(%s: %s/%s) for request %s", kind, ns, name, util.ReqString(fh.req)) - wEvent.Object = obj } - - ch <- wEvent + mutated = true + klog.V(2).Infof("mutate master service with ClusterIP:Port=%s:%d", fh.host, fh.port) } + return svc, mutated } diff --git a/pkg/yurthub/filter/masterservice/handler_test.go b/pkg/yurthub/filter/masterservice/handler_test.go index d49e3164af2..71e185bbe1e 100644 --- a/pkg/yurthub/filter/masterservice/handler_test.go +++ b/pkg/yurthub/filter/masterservice/handler_test.go @@ -17,47 +17,26 @@ limitations under the License. package masterservice import ( - "bytes" - "io" - "net/http" - "sync" + "reflect" "testing" - "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" ) -func TestObjectResponseFilter(t *testing.T) { +func TestRuntimeObjectFilter(t *testing.T) { fh := &masterServiceFilterHandler{ host: "169.251.2.1", port: 10268, } testcases := map[string]struct { - group string - version string - resources string - userAgent string - accept string - verb string - path string - originalList runtime.Object - expectResult runtime.Object + responseObject runtime.Object + expectObject runtime.Object }{ "serviceList contains kubernetes service": { - group: "", - version: "v1", - resources: "services", - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/services", - originalList: &corev1.ServiceList{ + responseObject: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -90,7 +69,7 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, - expectResult: &corev1.ServiceList{ + expectObject: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -125,14 +104,7 @@ func TestObjectResponseFilter(t *testing.T) { }, }, "serviceList does not contain kubernetes service": { - group: "", - version: "v1", - resources: "services", - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/services", - originalList: &corev1.ServiceList{ + responseObject: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -150,7 +122,7 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, - expectResult: &corev1.ServiceList{ + expectObject: &corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ @@ -169,15 +141,72 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, + "it's a kubernetes service": { + responseObject: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: MasterServiceName, + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.0.1", + Ports: []corev1.ServicePort{ + { + Port: 443, + Name: MasterServicePortName, + }, + }, + }, + }, + expectObject: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: MasterServiceName, + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: fh.host, + Ports: []corev1.ServicePort{ + { + Port: fh.port, + Name: MasterServicePortName, + }, + }, + }, + }, + }, + "it's not a kubernetes service": { + responseObject: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.0.1", + Ports: []corev1.ServicePort{ + { + Port: 443, + Name: MasterServicePortName, + }, + }, + }, + }, + expectObject: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: MasterServiceNamespace, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.0.1", + Ports: []corev1.ServicePort{ + { + Port: 443, + Name: MasterServicePortName, + }, + }, + }, + }, + }, "not serviceList": { - group: "", - version: "v1", - resources: "pods", - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods", - originalList: &corev1.PodList{ + responseObject: &corev1.PodList{ Items: []corev1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -195,7 +224,7 @@ func TestObjectResponseFilter(t *testing.T) { }, }, }, - expectResult: &corev1.PodList{ + expectObject: &corev1.PodList{ Items: []corev1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -218,257 +247,14 @@ func TestObjectResponseFilter(t *testing.T) { for k, tt := range testcases { t.Run(k, func(t *testing.T) { - req, _ := http.NewRequest(tt.verb, tt.path, nil) - req.Header.Set("User-Agent", tt.userAgent) - req.Header.Set("Accept", tt.accept) - fh.req = req - fh.serializer = serializer.NewSerializerManager(). - CreateSerializer(tt.accept, tt.group, tt.version, tt.resources) - - originalBytes, err := fh.serializer.Encode(tt.originalList) - if err != nil { - t.Errorf("encode originalList error: %v\n", err) - } - - filteredBytes, err := fh.ObjectResponseFilter(originalBytes) - if err != nil { - t.Errorf("ObjectResponseFilter got error: %v\n", err) - } - - expectedBytes, err := fh.serializer.Encode(tt.expectResult) - if err != nil { - t.Errorf("encode expectedResult error: %v\n", err) - } - - if !bytes.Equal(filteredBytes, expectedBytes) { - result, _ := fh.serializer.Decode(filteredBytes) - t.Errorf("ObjectResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult, result) - } - }) - } -} - -func TestStreamResponseFilter(t *testing.T) { - fh := &masterServiceFilterHandler{ - host: "169.251.2.1", - port: 10268, - } - - testcases := map[string]struct { - group string - version string - resources string - userAgent string - accept string - verb string - path string - inputObj []watch.Event - expectResult []runtime.Object - }{ - "watch kubernetes service": { - group: "", - version: "v1", - resources: "services", - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/services?watch=true", - inputObj: []watch.Event{ - {Type: watch.Modified, Object: &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: MasterServiceName, - Namespace: MasterServiceNamespace, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.0.1", - Ports: []corev1.ServicePort{ - { - Port: 443, - Name: MasterServicePortName, - }, - }, - }, - }}, - {Type: watch.Modified, Object: &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc1", - Namespace: MasterServiceNamespace, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.188", - Ports: []corev1.ServicePort{ - { - Port: 80, - }, - }, - }, - }}, - }, - expectResult: []runtime.Object{ - &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: MasterServiceName, - Namespace: MasterServiceNamespace, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: fh.host, - Ports: []corev1.ServicePort{ - { - Port: fh.port, - Name: MasterServicePortName, - }, - }, - }, - }, - &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc1", - Namespace: MasterServiceNamespace, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.105.188", - Ports: []corev1.ServicePort{ - { - Port: 80, - }, - }, - }, - }, - }, - }, - "watch without kubernetes service": { - group: "", - version: "v1", - resources: "services", - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/services?watch=true", - inputObj: []watch.Event{ - {Type: watch.Modified, Object: &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc1", - Namespace: MasterServiceNamespace, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.188.105", - Ports: []corev1.ServicePort{ - { - Port: 80, - }, - }, - }, - }}, - }, - expectResult: []runtime.Object{ - &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "svc1", - Namespace: MasterServiceNamespace, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "10.96.188.105", - Ports: []corev1.ServicePort{ - { - Port: 80, - }, - }, - }, - }, - }, - }, - "watch pods": { - group: "", - version: "v1", - resources: "pods", - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods?watch=true", - inputObj: []watch.Event{ - {Type: watch.Modified, Object: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: MasterServiceNamespace, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx", - }, - }, - }, - }}, - }, - expectResult: []runtime.Object{ - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: MasterServiceNamespace, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx", - }, - }, - }, - }, - }, - }, - } - - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - wg := sync.WaitGroup{} - req, _ := http.NewRequest(tt.verb, tt.path, nil) - req.Header.Set("User-Agent", tt.userAgent) - req.Header.Set("Accept", tt.accept) - fh.req = req - - fh.serializer = serializer.NewSerializerManager(). - CreateSerializer(tt.accept, tt.group, tt.version, tt.resources) - - r, w := io.Pipe() - wg.Add(1) - go func(w *io.PipeWriter) { - defer wg.Done() - for i := range tt.inputObj { - if _, err := fh.serializer.WatchEncode(w, &tt.inputObj[i]); err != nil { - t.Errorf("%d: encode watch unexpected error: %v", i, err) - continue - } - time.Sleep(100 * time.Millisecond) - } - w.Close() - }(w) - - rc := io.NopCloser(r) - ch := make(chan watch.Event, len(tt.inputObj)) - - wg.Add(1) - go func(rc io.ReadCloser, ch chan watch.Event) { - defer wg.Done() - err := fh.StreamResponseFilter(rc, ch) - if err != nil && err != io.EOF { - t.Errorf("failed to filter stream at case %s, %v", k, err) - } - }(rc, ch) - - for i := 0; i < len(tt.expectResult); i++ { - event := <-ch - - resultBytes, _ := fh.serializer.Encode(event.Object) - expectedBytes, _ := fh.serializer.Encode(tt.expectResult[i]) - - if !bytes.Equal(resultBytes, expectedBytes) { - t.Errorf("StreamResponseFilter got error, expected: \n%v\nbut got: \n%v\n", tt.expectResult[i], event.Object) - break + newObj, isNil := fh.RuntimeObjectFilter(tt.responseObject) + if tt.expectObject == nil { + if !isNil { + t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj) } + } else if !reflect.DeepEqual(newObj, tt.expectObject) { + t.Errorf("RuntimeObjectFilter got error, expected: \n%v\nbut got: \n%v\n, isNil=%v", tt.expectObject, newObj, isNil) } - wg.Wait() }) } } diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index 855cecd45c0..ed894af9253 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -30,7 +30,6 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/util" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" @@ -38,15 +37,16 @@ import ( ) // Register registers a filter -func Register(filters *filter.Filters) { +func Register(filters *filter.Filters, sm *serializer.SerializerManager) { filters.Register(filter.ServiceTopologyFilterName, func() (filter.Runner, error) { - return NewFilter(), nil + return NewFilter(sm), nil }) } -func NewFilter() *serviceTopologyFilter { +func NewFilter(sm *serializer.SerializerManager) *serviceTopologyFilter { return &serviceTopologyFilter{ - workingMode: util.WorkingModeEdge, + workingMode: util.WorkingModeEdge, + serializerManager: sm, } } @@ -147,21 +147,11 @@ func (ssf *serviceTopologyFilter) SetStorageWrapper(s cachemanager.StorageWrappe return nil } -func (ssf *serviceTopologyFilter) SetSerializerManager(s *serializer.SerializerManager) error { - ssf.serializerManager = s - return nil -} - func (ssf *serviceTopologyFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { if ok := cache.WaitForCacheSync(stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok { return 0, rc, nil } - s := filterutil.CreateSerializer(req, ssf.serializerManager) - if s == nil { - klog.Errorf("skip filter, failed to create serializer in serviceTopologyFilter") - return 0, rc, nil - } - handler := NewServiceTopologyFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter) - return filter.NewFilterReadCloser(req, rc, handler, s, filter.ServiceTopologyFilterName, stopCh) + handler := NewServiceTopologyFilterHandler(ssf.nodeName, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter) + return filter.NewFilterReadCloser(req, ssf.serializerManager, rc, handler, ssf.Name(), stopCh) } diff --git a/pkg/yurthub/filter/servicetopology/handler.go b/pkg/yurthub/filter/servicetopology/handler.go index fab41251add..ce634255504 100644 --- a/pkg/yurthub/filter/servicetopology/handler.go +++ b/pkg/yurthub/filter/servicetopology/handler.go @@ -17,19 +17,15 @@ limitations under the License. package servicetopology import ( - "io" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" discoveryV1beta1 "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" listers "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1" ) @@ -47,7 +43,6 @@ var ( type serviceTopologyFilterHandler struct { nodeName string - serializer *serializer.Serializer serviceLister listers.ServiceLister nodePoolLister appslisters.NodePoolLister nodeGetter filter.NodeGetter @@ -55,27 +50,19 @@ type serviceTopologyFilterHandler struct { func NewServiceTopologyFilterHandler( nodeName string, - serializer *serializer.Serializer, serviceLister listers.ServiceLister, nodePoolLister appslisters.NodePoolLister, - nodeGetter filter.NodeGetter) filter.Handler { + nodeGetter filter.NodeGetter) filter.ObjectHandler { return &serviceTopologyFilterHandler{ nodeName: nodeName, - serializer: serializer, serviceLister: serviceLister, nodePoolLister: nodePoolLister, nodeGetter: nodeGetter, } } -//ObjectResponseFilter filter the endpointSlice or endpoints from get response object and return the bytes -func (fh *serviceTopologyFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { - obj, err := fh.serializer.Decode(b) - if err != nil || obj == nil { - klog.Errorf("skip serviceTopologyFilterHandler: failed to decode response in ObjectResponseFilter, %v", err) - return b, nil - } - +// RuntimeObjectFilter filter the endpointSlice or endpoints from response object and return the filtered object +func (fh *serviceTopologyFilterHandler) RuntimeObjectFilter(obj runtime.Object) (runtime.Object, bool) { switch v := obj.(type) { case *discoveryV1beta1.EndpointSliceList: // filter endpointSlice before k8s 1.21 @@ -85,7 +72,7 @@ func (fh *serviceTopologyFilterHandler) ObjectResponseFilter(b []byte) ([]byte, items = append(items, *eps) } v.Items = items - return fh.serializer.Encode(v) + return v, false case *discovery.EndpointSliceList: var items []discovery.EndpointSlice for i := range v.Items { @@ -93,7 +80,7 @@ func (fh *serviceTopologyFilterHandler) ObjectResponseFilter(b []byte) ([]byte, items = append(items, *eps) } v.Items = items - return fh.serializer.Encode(v) + return v, false case *v1.EndpointsList: var items []v1.Endpoints for i := range v.Items { @@ -101,34 +88,11 @@ func (fh *serviceTopologyFilterHandler) ObjectResponseFilter(b []byte) ([]byte, items = append(items, *ep) } v.Items = items - return fh.serializer.Encode(v) + return v, false + case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice: + return fh.serviceTopologyHandler(v), false default: - return b, nil - } -} - -// StreamResponseFilter filter the endpointslice or endpoints from watch response object and return the bytes -func (fh *serviceTopologyFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { - defer func() { - close(ch) - }() - - d, err := fh.serializer.WatchDecoder(rc) - if err != nil { - klog.Errorf("StreamResponseFilter of serviceTopologyFilterHandler ended with error, %v", err) - return err - } - - for { - watchType, obj, err := d.Decode() - if err != nil { - return err - } - - ch <- watch.Event{ - Type: watchType, - Object: fh.serviceTopologyHandler(obj), - } + return obj, false } } diff --git a/pkg/yurthub/filter/servicetopology/handler_test.go b/pkg/yurthub/filter/servicetopology/handler_test.go index a3cc2fddeba..698142dfb33 100644 --- a/pkg/yurthub/filter/servicetopology/handler_test.go +++ b/pkg/yurthub/filter/servicetopology/handler_test.go @@ -18,8 +18,6 @@ package servicetopology import ( "context" - "net/http" - "net/http/httptest" "reflect" "testing" "time" @@ -29,43 +27,27 @@ import ( discoveryV1beta1 "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apiserver/pkg/endpoints/filters" - "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/informers" k8sfake "k8s.io/client-go/kubernetes/fake" - filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" - "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" - hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" yurtfake "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned/fake" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" ) -func newTestRequestInfoResolver() *request.RequestInfoFactory { - return &request.RequestInfoFactory{ - APIPrefixes: sets.NewString("api", "apis"), - GrouplessAPIPrefixes: sets.NewString("api"), - } -} - -func TestServiceTopologyHandler(t *testing.T) { +func TestRuntimeObjectFilter(t *testing.T) { currentNodeName := "node1" nodeName2 := "node2" nodeName3 := "node3" testcases := map[string]struct { - path string - object runtime.Object - kubeClient *k8sfake.Clientset - yurtClient *yurtfake.Clientset - expectResult runtime.Object + responseObject runtime.Object + kubeClient *k8sfake.Clientset + yurtClient *yurtfake.Clientset + expectObject runtime.Object }{ "v1beta1.EndpointSliceList: topologyKeys is kubernetes.io/hostname": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -176,11 +158,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -213,8 +191,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1beta1.EndpointSliceList: topologyKeys is openyurt.io/nodepool": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -325,11 +302,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -370,8 +343,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1beta1.EndpointSliceList: topologyKeys is kubernetes.io/zone": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -482,11 +454,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -527,8 +495,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1beta1.EndpointSliceList: without openyurt.io/topologyKeys": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -637,11 +604,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -690,8 +653,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1beta1.EndpointSliceList: currentNode is not in any nodepool": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -799,11 +761,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -836,8 +794,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1beta1.EndpointSliceList: currentNode has no endpoints on node": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -948,11 +905,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -967,8 +920,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1beta1.EndpointSliceList: currentNode has no endpoints in nodepool": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1079,11 +1031,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1098,8 +1046,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1beta1.EndpointSliceList: no service info in endpointslice": { - path: "/apis/discovery.k8s.io/v1beta1/endpointslices", - object: &discoveryV1beta1.EndpointSliceList{ + responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1207,11 +1154,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discoveryV1beta1.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1beta1", - }, + expectObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1257,8 +1200,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointSliceList: topologyKeys is kubernetes.io/hostname": { - path: "/apis/discovery.k8s.io/v1/endpointslices", - object: &discovery.EndpointSliceList{ + responseObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1361,11 +1303,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discovery.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1", - }, + expectObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1394,8 +1332,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointSliceList: topologyKeys is openyurt.io/nodepool": { - path: "/apis/discovery.k8s.io/v1/endpointslices", - object: &discovery.EndpointSliceList{ + responseObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1498,11 +1435,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discovery.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1", - }, + expectObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1537,8 +1470,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointSliceList: topologyKeys is kubernetes.io/zone": { - path: "/apis/discovery.k8s.io/v1/endpointslices", - object: &discovery.EndpointSliceList{ + responseObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1641,11 +1573,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discovery.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1", - }, + expectObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1680,8 +1608,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointSliceList: without openyurt.io/topologyKeys": { - path: "/apis/discovery.k8s.io/v1/endpointslices", - object: &discovery.EndpointSliceList{ + responseObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1782,11 +1709,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discovery.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1", - }, + expectObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1827,8 +1750,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointSliceList: currentNode is not in any nodepool": { - path: "/apis/discovery.k8s.io/v1/endpointslices", - object: &discovery.EndpointSliceList{ + responseObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1928,11 +1850,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discovery.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1", - }, + expectObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -1961,8 +1879,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointSliceList: currentNode has no endpoints on node": { - path: "/apis/discovery.k8s.io/v1/endpointslices", - object: &discovery.EndpointSliceList{ + responseObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -2053,11 +1970,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discovery.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1", - }, + expectObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -2072,8 +1985,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointSliceList: currentNode has no endpoints in nodePool": { - path: "/apis/discovery.k8s.io/v1/endpointslices", - object: &discovery.EndpointSliceList{ + responseObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -2164,11 +2076,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &discovery.EndpointSliceList{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSliceList", - APIVersion: "discovery.k8s.io/v1", - }, + expectObject: &discovery.EndpointSliceList{ Items: []discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -2183,8 +2091,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: topologyKeys is kubernetes.io/hostname": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2284,7 +2191,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2314,8 +2221,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: topologyKeys is openyurt.io/nodepool": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2415,7 +2321,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2449,8 +2355,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: topologyKeys is kubernetes.io/zone": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2550,7 +2455,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2584,8 +2489,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: without openyurt.io/topologyKeys": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2683,7 +2587,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2721,8 +2625,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: currentNode is not in any nodepool": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2819,7 +2722,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2849,8 +2752,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: currentNode has no endpoints on node": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2942,7 +2844,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -2958,8 +2860,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: currentNode has no endpoints in nodepool": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -3051,7 +2952,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -3067,8 +2968,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.EndpointsList: unknown openyurt.io/topologyKeys": { - path: "/api/v1/endpoints", - object: &corev1.EndpointsList{ + responseObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -3160,7 +3060,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.EndpointsList{ + expectObject: &corev1.EndpointsList{ TypeMeta: metav1.TypeMeta{ Kind: "EndpointsList", APIVersion: "v1", @@ -3190,8 +3090,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, "v1.Pod: un-recognized object for filter": { - path: "/api/v1/pods", - object: &corev1.Pod{ + responseObject: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", Namespace: "default", @@ -3261,11 +3160,7 @@ func TestServiceTopologyHandler(t *testing.T) { }, }, ), - expectResult: &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, + expectObject: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", Namespace: "default", @@ -3274,12 +3169,8 @@ func TestServiceTopologyHandler(t *testing.T) { }, } - resolver := newTestRequestInfoResolver() - sw := serializer.NewSerializerManager() for k, tt := range testcases { t.Run(k, func(t *testing.T) { - //tt.kubeClient.DiscoveryV1beta1().EndpointSlices("default").Create(context.TODO(), tt.endpointSlice, metav1.CreateOptions{}) - factory := informers.NewSharedInformerFactory(tt.kubeClient, 24*time.Hour) serviceInformer := factory.Core().V1().Services() serviceInformer.Informer() @@ -3303,36 +3194,13 @@ func TestServiceTopologyHandler(t *testing.T) { return tt.kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) } - req, err := http.NewRequest("GET", tt.path, nil) - if err != nil { - t.Errorf("failed to create request, %v", err) + fh := NewServiceTopologyFilterHandler(currentNodeName, serviceLister, nodePoolLister, nodeGetter) + newObj, isNil := fh.RuntimeObjectFilter(tt.responseObject) + if isNil { + t.Errorf("empty object is returned") } - req.RemoteAddr = "127.0.0.1" - req.Header.Set("Accept", "application/json") - - var handledObject runtime.Object - var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - ctx := req.Context() - reqContentType, _ := hubutil.ReqContentTypeFrom(ctx) - ctx = hubutil.WithRespContentType(ctx, reqContentType) - req = req.WithContext(ctx) - s := filterutil.CreateSerializer(req, sw) - if s == nil { - t.Fatalf("failed to create serializer, %v", s) - } - - fh := NewServiceTopologyFilterHandler(currentNodeName, s, serviceLister, nodePoolLister, nodeGetter) - inputB, _ := s.Encode(tt.object) - filteredB, _ := fh.ObjectResponseFilter(inputB) - handledObject, _ = s.Decode(filteredB) - }) - - handler = util.WithRequestContentType(handler) - handler = filters.WithRequestInfo(handler, resolver) - handler.ServeHTTP(httptest.NewRecorder(), req) - - if !reflect.DeepEqual(handledObject, tt.expectResult) { - t.Errorf("serviceTopologyHandler expect: \n%#+v\nbut got: \n%#+v\n", tt.expectResult, handledObject) + if !reflect.DeepEqual(newObj, tt.expectObject) { + t.Errorf("serviceTopologyHandler expect: \n%#+v\nbut got: \n%#+v\n", tt.expectObject, newObj) } }) } diff --git a/pkg/yurthub/filter/util/utils.go b/pkg/yurthub/filter/util/utils.go deleted file mode 100644 index 2919cbc450d..00000000000 --- a/pkg/yurthub/filter/util/utils.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2021 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "net/http" - - apirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/klog/v2" - - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" - "github.com/openyurtio/openyurt/pkg/yurthub/util" -) - -func CreateSerializer(req *http.Request, sm *serializer.SerializerManager) *serializer.Serializer { - ctx := req.Context() - respContentType, _ := util.RespContentTypeFrom(ctx) - info, _ := apirequest.RequestInfoFrom(ctx) - if respContentType == "" || info == nil || info.APIVersion == "" || info.Resource == "" { - klog.Infof("CreateSerializer failed , info is :%+v", info) - return nil - } - return sm.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource) -}