Skip to content

Commit

Permalink
1. bugfix: StreamResponseFilter of data filter framework can't work i…
Browse files Browse the repository at this point in the history
…f size of one object is over 32KB (#1066)

2. improve data filter framework
  - define ObjectHandler interface for filtering Runtime.Object, and other common routines are removed to FilterReadCloser
  - add SerializerManager as common parameter for all filters.
  • Loading branch information
rambohe-ch authored Nov 21, 2022
1 parent d5ffeda commit 775d69d
Show file tree
Hide file tree
Showing 15 changed files with 984 additions and 1,125 deletions.
26 changes: 7 additions & 19 deletions pkg/yurthub/filter/discardcloudservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ import (
"net/http"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/filter"
filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
)

// Register registers a filter
func Register(filters *filter.Filters) {
func Register(filters *filter.Filters, sm *serializer.SerializerManager) {
filters.Register(filter.DiscardCloudServiceFilterName, func() (filter.Runner, error) {
return NewFilter(), nil
return NewFilter(sm), nil
})
}

func NewFilter() *discardCloudServiceFilter {
return &discardCloudServiceFilter{}
func NewFilter(sm *serializer.SerializerManager) *discardCloudServiceFilter {
return &discardCloudServiceFilter{
serializerManager: sm,
}
}

type discardCloudServiceFilter struct {
Expand All @@ -53,18 +53,6 @@ func (sf *discardCloudServiceFilter) SupportedResourceAndVerbs() map[string]sets
}
}

func (sf *discardCloudServiceFilter) SetSerializerManager(s *serializer.SerializerManager) error {
sf.serializerManager = s
return nil
}

func (sf *discardCloudServiceFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) {
s := filterutil.CreateSerializer(req, sf.serializerManager)
if s == nil {
klog.Errorf("skip filter, failed to create serializer in discardCloudServiceFilter")
return 0, rc, nil
}

handler := NewDiscardCloudServiceFilterHandler(s)
return filter.NewFilterReadCloser(req, rc, handler, s, filter.DiscardCloudServiceFilterName, stopCh)
return filter.NewFilterReadCloser(req, sf.serializerManager, rc, NewDiscardCloudServiceFilterHandler(), sf.Name(), stopCh)
}
114 changes: 36 additions & 78 deletions pkg/yurthub/filter/discardcloudservice/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package discardcloudservice

import (
"fmt"
"io"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
)

var (
Expand All @@ -34,91 +32,51 @@ var (
}
)

type discardCloudServiceFilterHandler struct {
serializer *serializer.Serializer
}
type discardCloudServiceFilterHandler struct{}

func NewDiscardCloudServiceFilterHandler(serializer *serializer.Serializer) filter.Handler {
return &discardCloudServiceFilterHandler{
serializer: serializer,
}
func NewDiscardCloudServiceFilterHandler() filter.ObjectHandler {
return &discardCloudServiceFilterHandler{}
}

// ObjectResponseFilter remove the cloud service(like LoadBalancer service) from response object
func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
list, err := fh.serializer.Decode(b)
if err != nil || list == nil {
klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err)
return b, nil
}

serviceList, ok := list.(*v1.ServiceList)
if ok {
// RuntimeObjectFilter remove the cloud service(like LoadBalancer service) from response object
func (fh *discardCloudServiceFilterHandler) RuntimeObjectFilter(obj runtime.Object) (runtime.Object, bool) {
switch v := obj.(type) {
case *v1.ServiceList:
var svcNew []v1.Service
for i := range serviceList.Items {
nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name)
// remove lb service
if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer {
if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" {
klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
continue
}
}

// remove cloud clusterIP service
if _, ok := cloudClusterIPService[nsName]; ok {
klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
continue
for i := range v.Items {
svc := discardCloudService(&v.Items[i])
if svc != nil {
svcNew = append(svcNew, *svc)
}

svcNew = append(svcNew, serviceList.Items[i])
}
serviceList.Items = svcNew
return fh.serializer.Encode(serviceList)
v.Items = svcNew
return v, false
case *v1.Service:
svc := discardCloudService(v)
if svc == nil {
return svc, true
}
return svc, false
default:
return v, false
}

return b, nil
}

// StreamResponseFilter filter the cloud service(like LoadBalancer service) from watch stream response
func (fh *discardCloudServiceFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error {
defer func() {
close(ch)
}()

d, err := fh.serializer.WatchDecoder(rc)
if err != nil {
klog.Errorf("StreamResponseFilter for discardCloudServiceFilterHandler ended with error, %v", err)
return err
}

for {
watchType, obj, err := d.Decode()
if err != nil {
return err
}

service, ok := obj.(*v1.Service)
if ok {
nsName := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
// remove cloud LoadBalancer service
if service.Spec.Type == v1.ServiceTypeLoadBalancer {
if service.Annotations[filter.SkipDiscardServiceAnnotation] != "true" {
klog.V(2).Infof("load balancer service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName)
continue
}
}

// remove cloud clusterIP service
if _, ok := cloudClusterIPService[nsName]; ok {
klog.V(2).Infof("clusterIP service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName)
continue
}
func discardCloudService(svc *v1.Service) *v1.Service {
nsName := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)
// remove cloud LoadBalancer service
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
if svc.Annotations[filter.SkipDiscardServiceAnnotation] != "true" {
klog.V(2).Infof("load balancer service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName)
return nil
}
}

var wEvent watch.Event
wEvent.Type = watchType
wEvent.Object = obj
ch <- wEvent
// remove cloud clusterIP service
if _, ok := cloudClusterIPService[nsName]; ok {
klog.V(2).Infof("clusterIP service(%s) is discarded in StreamResponseFilter of discardCloudServiceFilterHandler", nsName)
return nil
}

return svc
}
Loading

0 comments on commit 775d69d

Please sign in to comment.