Skip to content

Commit

Permalink
enhancement: add filter name info into log message. (openyurtio#455)
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch authored Sep 8, 2021
1 parent 8c2c1ac commit 2a3655b
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 23 deletions.
3 changes: 1 addition & 2 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/kubelet"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
Expand Down Expand Up @@ -167,7 +166,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}

// start shared informers here
if filterChain != nil && cfg.Filters.Enabled(servicetopology.FilterName) {
if filterChain != nil && cfg.Filters.Enabled(filter.ServiceTopologyFilterName) {
cfg.SharedFactory.Start(stopCh)
cfg.YurtSharedFactory.Start(stopCh)
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/yurthub/filter/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filter

const (
// masterservice filter is used to mutate the ClusterIP and https port of default/kubernetes service
// in order to pods on edge nodes can access kube-apiserver directly by inClusterConfig.
MasterServiceFilterName = "masterservice"

// servicetopology filter is used to reassemble endpointslice in order to make the service traffic
// under the topology that defined by service.Annotation["openyurt.io/topologyKeys"]
ServiceTopologyFilterName = "servicetopology"

// discardcloudservice filter is used to discard cloud service(like loadBalancer service)
// on kube-proxy list/watch service request from edge nodes.
DiscardCloudServiceFilterName = "discardcloudservice"
)
8 changes: 2 additions & 6 deletions pkg/yurthub/filter/discardcloudservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
)

// discardcloudservice filter is used to discard cloud service(like loadBalancer service)
// on kube-proxy list/watch service request from edge nodes.
const FilterName = "discardcloudservice"

// Register registers a filter
func Register(filters *filter.Filters) {
filters.Register(FilterName, func() (filter.Interface, error) {
filters.Register(filter.DiscardCloudServiceFilterName, func() (filter.Interface, error) {
return NewFilter(), nil
})
}
Expand Down Expand Up @@ -62,5 +58,5 @@ func (sf *discardCloudServiceFilter) Filter(req *http.Request, rc io.ReadCloser,
}

handler := NewDiscardCloudServiceFilterHandler(s)
return filter.NewFilterReadCloser(req, rc, handler, s, stopCh)
return filter.NewFilterReadCloser(req, rc, handler, s, filter.DiscardCloudServiceFilterName, stopCh)
}
9 changes: 6 additions & 3 deletions pkg/yurthub/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type filterReadCloser struct {
handler Handler
isWatch bool
serializer *serializer.Serializer
ownerName string
stopCh <-chan struct{}
}

Expand All @@ -132,6 +133,7 @@ func NewFilterReadCloser(
rc io.ReadCloser,
handler Handler,
serializer *serializer.Serializer,
ownerName string,
stopCh <-chan struct{}) (int, io.ReadCloser, error) {

ctx := req.Context()
Expand All @@ -144,14 +146,15 @@ func NewFilterReadCloser(
handler: handler,
isWatch: info.Verb == "watch",
serializer: serializer,
ownerName: ownerName,
stopCh: stopCh,
}

if dr.isWatch {
go func(req *http.Request, rc io.ReadCloser, ch chan watch.Event) {
err := handler.StreamResponseFilter(rc, ch)
if err != nil && err != io.EOF && err != context.Canceled {
klog.Errorf("filter watch response ended with error, %v", err)
klog.Errorf("filter(%s) watch response ended with error, %v", dr.ownerName, err)
}
}(req, rc, dr.ch)
return 0, dr, nil
Expand Down Expand Up @@ -180,12 +183,12 @@ func (dr *filterReadCloser) Read(p []byte) (int, error) {
buf := &bytes.Buffer{}
n, err := dr.serializer.WatchEncode(buf, &watchEvent)
if err != nil {
klog.Errorf("failed to encode resource in Reader %v", err)
klog.Errorf("filter(%s) failed to encode resource in Reader %v", dr.ownerName, err)
return 0, err
}
copied := copy(p, buf.Bytes())
if copied != n {
return 0, fmt.Errorf("expect copy %d bytes, but only %d bytes copyied", n, copied)
return 0, fmt.Errorf("filter(%s) expect copy %d bytes, but only %d bytes copyied", dr.ownerName, n, copied)
}

return n, nil
Expand Down
8 changes: 2 additions & 6 deletions pkg/yurthub/filter/masterservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
)

// masterservice filter is used to mutate the ClusterIP and https port of default/kubernetes service
// in order to pods on edge nodes can access kube-apiserver directly by inClusterConfig.
const FilterName = "masterservice"

// Register registers a filter
func Register(filters *filter.Filters) {
filters.Register(FilterName, func() (filter.Interface, error) {
filters.Register(filter.MasterServiceFilterName, func() (filter.Interface, error) {
return NewFilter(), nil
})
}
Expand Down Expand Up @@ -90,5 +86,5 @@ func (msf *masterServiceFilter) Filter(req *http.Request, rc io.ReadCloser, stop
}

handler := NewMasterServiceFilterHandler(req, s, msf.host, msf.port)
return filter.NewFilterReadCloser(req, rc, handler, s, stopCh)
return filter.NewFilterReadCloser(req, rc, handler, s, filter.MasterServiceFilterName, stopCh)
}
8 changes: 2 additions & 6 deletions pkg/yurthub/filter/servicetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ import (
"k8s.io/klog"
)

// servicetopology filter is used to reassemble endpointslice in order to make the service traffic
// under the topology that defined by service.Annotation["openyurt.io/topologyKeys"]
const FilterName = "servicetopology"

// Register registers a filter
func Register(filters *filter.Filters) {
filters.Register(FilterName, func() (filter.Interface, error) {
filters.Register(filter.ServiceTopologyFilterName, func() (filter.Interface, error) {
return NewFilter(), nil
})
}
Expand Down Expand Up @@ -148,5 +144,5 @@ func (ssf *serviceTopologyFilter) Filter(req *http.Request, rc io.ReadCloser, st
}

handler := NewServiceTopologyFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter)
return filter.NewFilterReadCloser(req, rc, handler, s, stopCh)
return filter.NewFilterReadCloser(req, rc, handler, s, filter.ServiceTopologyFilterName, stopCh)
}

0 comments on commit 2a3655b

Please sign in to comment.