Skip to content

Commit

Permalink
Use configmap to configure the data source of filter framework
Browse files Browse the repository at this point in the history
  • Loading branch information
yingjianjian committed Mar 21, 2022
1 parent fa65aa3 commit bb5e972
Show file tree
Hide file tree
Showing 20 changed files with 428 additions and 208 deletions.
36 changes: 19 additions & 17 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/ingresscontroller"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/endpointsfilter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
Expand Down Expand Up @@ -87,7 +87,7 @@ type YurtHubConfiguration struct {
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
FilterChain filter.Interface
FilterManager *filter.Manager
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -120,17 +120,17 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort)
workingMode := util.WorkingMode(options.WorkingMode)

var filterChain filter.Interface
var filterMapping map[string]filter.Runner
var filters *filter.Filters
var serviceTopologyFilterEnabled bool
var mutatedMasterServiceAddr string
var filterManager *filter.Manager
if options.EnableResourceFilter {
if options.WorkingMode == string(util.WorkingModeCloud) {
options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...)
}
filters = filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)

serviceTopologyFilterEnabled = filters.Enabled(filter.ServiceTopologyFilterName)
mutatedMasterServiceAddr = us[0].Host
if options.AccessServerThroughHub {
Expand All @@ -147,11 +147,15 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
return nil, err
}
registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled, options.NodePoolName, options.NodeName)
filterChain, err = createFilterChain(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr)
filterMapping, err = generateNameToFilterMapping(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr)
if err != nil {
return nil, err
}

if options.EnableResourceFilter {
filterManager = filter.NewFilterManager(sharedFactory, filterMapping)
}

cfg := &YurtHubConfiguration{
LBMode: options.LBMode,
RemoteServers: us,
Expand Down Expand Up @@ -183,7 +187,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
SharedFactory: sharedFactory,
YurtSharedFactory: yurtSharedFactory,
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterChain: filterChain,
FilterManager: filterManager,
}

return cfg, nil
Expand Down Expand Up @@ -272,15 +276,13 @@ func registerInformers(informerFactory informers.SharedInformerFactory,
}
}

if workingMode == util.WorkingModeEdge {
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
}
return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
}
informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}
informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
}

// registerAllFilters by order, the front registered filter will be
Expand All @@ -289,17 +291,17 @@ func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
ingresscontroller.Register(filters)
endpointsfilter.Register(filters)
}

// createFilterChain return union filters that initializations completed.
func createFilterChain(filters *filter.Filters,
// generateNameToFilterMapping return union filters that initializations completed.
func generateNameToFilterMapping(filters *filter.Filters,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
serializerManager *serializer.SerializerManager,
storageWrapper cachemanager.StorageWrapper,
workingMode util.WorkingMode,
nodeName, mutatedMasterServiceAddr string) (filter.Interface, error) {
nodeName, mutatedMasterServiceAddr string) (map[string]filter.Runner, error) {
if filters == nil {
return nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions config/setup/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ metadata:
namespace: kube-system
data:
cache_agents: ""
filter_endpoints: coredns/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""
4 changes: 4 additions & 0 deletions config/yaml-template/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ metadata:
namespace: kube-system
data:
cache_agents: ""
filter_endpoints: coredns/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""
7 changes: 6 additions & 1 deletion pkg/yurtctl/util/edgenode/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,10 @@ metadata:
name: yurt-hub-cfg
namespace: kube-system
data:
cache_agents: ""`
cache_agents: ""
filter_endpoints: coredns/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""
`
)
211 changes: 198 additions & 13 deletions pkg/yurthub/filter/approver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,211 @@ limitations under the License.

package filter

import "k8s.io/apimachinery/pkg/util/sets"
import (
"fmt"
"net/http"
"strings"
"sync"

type Approver struct {
comp string
resource string
operations sets.String
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

type requestInfo struct {
comp string
resource string
verbs sets.String
}

type approver struct {
sync.Mutex
nameToRequests map[string][]*requestInfo
whiteListRequests []*requestInfo
configMapSynced cache.InformerSynced
stopCh chan struct{}
}

func NewApprover(comp, resource string, verbs ...string) *Approver {
return &Approver{
comp: comp,
resource: resource,
operations: sets.NewString(verbs...),
var (
defaultWhiteListRequests = []*requestInfo{
{
comp: projectinfo.GetHubName(),
resource: "configmaps",
verbs: sets.NewString("list", "watch"),
},
}
defaultFilterCfg = map[string]string{
MasterServiceFilterName: "kubelet/services#list;watch",
EndpointsFilterName: "nginx-ingress-controller/endpoints#list;watch",
DiscardCloudServiceFilterName: "kube-proxy/services#list;watch",
ServiceTopologyFilterName: "kube-proxy/endpointslices#list;watch",
}
)

func newApprover(sharedFactory informers.SharedInformerFactory) *approver {
configMapInformer := sharedFactory.Core().V1().ConfigMaps().Informer()
na := &approver{
nameToRequests: make(map[string][]*requestInfo),
configMapSynced: configMapInformer.HasSynced,
whiteListRequests: make([]*requestInfo, 0),
stopCh: make(chan struct{}),
}
na.whiteListRequests = append(na.whiteListRequests, defaultWhiteListRequests...)
configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: na.addConfigMap,
UpdateFunc: na.updateConfigMap,
})
return na
}

func (a *Approver) Approve(comp, resource, verb string) bool {
if a.comp != comp || a.resource != resource {
func (a *approver) Approve(comp, resource, verb string) bool {
if a.isWhitelistReq(comp, resource, verb) {
return false
}
if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
panic("wait for configMap cache sync timeout")
}
a.Lock()
defer a.Unlock()
for _, requests := range a.nameToRequests {
for _, request := range requests {
if request.Equal(comp, resource, verb) {

return true
}
}
}
return false
}

func (a *approver) GetFilterName(req *http.Request) string {
ctx := req.Context()
comp, ok := util.ClientComponentFrom(ctx)
if !ok {
return ""
}

info, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
return ""
}
a.Lock()
defer a.Unlock()
for name, requests := range a.nameToRequests {
for _, request := range requests {
if request.Equal(comp, info.Resource, info.Verb) {
return name
}
}
}
return ""
}

// Determine whether it is a whitelist resource
func (a *approver) isWhitelistReq(comp, resource, verb string) bool {
for _, req := range a.whiteListRequests {
if req.Equal(comp, resource, verb) {
return true
}
}
return false
}

// Parse the special format of filter config: filter_{name}: user-agent1/resource#verb1;verb2, user-agent2/resource#verb1;verb2.
func (a *approver) parseFilterConfig(cfg string, filterType string) []*requestInfo {
var reqs []*requestInfo
cfg = a.mergeFilterConfig(cfg, filterType)
for _, configArr := range strings.Split(cfg, ",") {
cfg := strings.Split(configArr, "#")
if len(cfg) != 2 {
continue
}

v := strings.Split(cfg[0], "/")
if len(v) != 2 {
continue
}

req := &requestInfo{
comp: v[0],
resource: v[1],
verbs: sets.NewString(strings.Split(cfg[1], ";")...),
}
reqs = append(reqs, req)
}
return reqs
}

// merge default filter to custom filter
func (a *approver) mergeFilterConfig(cfg, filterType string) string {
if config, ok := defaultFilterCfg[filterType]; ok {
if len(cfg) != 0 {
return fmt.Sprintf("%v,%v", config, cfg)
}
}

return cfg
}

func (a *approver) addConfigMap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
for key, value := range cfg.Data {
if strings.HasPrefix(key, "filter_") {
a.updateYurtHubFilterCfg(key, value, "add")
}
}
}

func (a *approver) updateConfigMap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

for key, value := range newCfg.Data {
if _, ok := oldCfg.Data[key]; !ok {
if strings.HasPrefix(key, "filter_") {
a.updateYurtHubFilterCfg(key, value, "update")
continue
}
}

if oldCfg.Data[key] != value {
if strings.HasPrefix(key, "filter_") {
a.updateYurtHubFilterCfg(key, value, "update")
}
}
}
}

// update filter cfg
func (a *approver) updateYurtHubFilterCfg(filterCfgKey, filterCfgValue, action string) {
a.Lock()
defer a.Unlock()
s := strings.Split(filterCfgKey, "_")
a.nameToRequests[s[1]] = a.parseFilterConfig(filterCfgValue, s[1])
klog.Infof("current filter config: %v after %s", a.nameToRequests, action)
}

// Judge whether the request is allowed to be filtered
func (req *requestInfo) Equal(comp, resource, verb string) bool {
if req.comp == comp && req.resource == resource && req.verbs.Has(verb) {
return true
}

return a.operations.Has(verb)
return false
}
Loading

0 comments on commit bb5e972

Please sign in to comment.