From 4302579ec6b8beeb3a7fc13941928203fb2c3044 Mon Sep 17 00:00:00 2001 From: rambohe-ch Date: Thu, 18 Nov 2021 15:55:54 +0800 Subject: [PATCH] 1. fix service topology can not work when hub agent work on cloud mode --> solution: service topology filter will list/watch node from kube-apsierver if working mode is cloud fixes #600 2. optimize shared informers registeration. extract all of informers registeration and make a comman function named registerInformers --- cmd/yurthub/app/config/config.go | 103 +++++++++++++++--- cmd/yurthub/app/start.go | 30 +---- pkg/yurthub/cachemanager/cache_agent.go | 92 ++++++---------- pkg/yurthub/cachemanager/cache_agent_test.go | 2 +- pkg/yurthub/cachemanager/cache_manager.go | 2 - pkg/yurthub/filter/initializer/initializer.go | 19 +++- pkg/yurthub/filter/interfaces.go | 2 +- pkg/yurthub/filter/masterservice/filter.go | 8 -- pkg/yurthub/filter/servicetopology/filter.go | 32 +++++- pkg/yurthub/filter/servicetopology/handler.go | 2 +- pkg/yurthub/proxy/proxy.go | 4 +- 11 files changed, 169 insertions(+), 127 deletions(-) diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index d2660ca0ee1..1a2f9b8e1fa 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -29,20 +29,26 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" "github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice" "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/storage/factory" "github.com/openyurtio/openyurt/pkg/yurthub/util" + yurtcorev1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" + yurtv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" ) @@ -75,12 +81,11 @@ type YurtHubConfiguration struct { SerializerManager *serializer.SerializerManager RESTMapperManager *meta.RESTMapperManager TLSConfig *tls.Config - MutatedMasterServiceAddr string - Filters *filter.Filters SharedFactory informers.SharedInformerFactory YurtSharedFactory yurtinformers.SharedInformerFactory WorkingMode util.WorkingMode KubeletHealthGracePeriod time.Duration + FilterChain filter.Interface } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -104,12 +109,11 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { proxySecureServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxySecurePort) proxyServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxyPort) proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort) - sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr), options.NodePoolName) - if err != nil { - return nil, err - } + workingMode := util.WorkingMode(options.WorkingMode) + var filterChain filter.Interface var filters *filter.Filters + var serviceTopologyFilterEnabled bool var mutatedMasterServiceAddr string if options.EnableResourceFilter { if options.WorkingMode == string(util.WorkingModeCloud) { @@ -118,6 +122,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { filters = filter.NewFilters(options.DisabledResourceFilters) registerAllFilters(filters) + serviceTopologyFilterEnabled = filters.Enabled(filter.ServiceTopologyFilterName) mutatedMasterServiceAddr = us[0].Host if options.AccessServerThroughHub { if options.EnableDummyIf { @@ -128,6 +133,16 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { } } + sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr)) + if err != nil { + return nil, err + } + registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled, options.NodePoolName, options.NodeName) + filterChain, err = createFilterChain(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr) + if err != nil { + return nil, err + } + cfg := &YurtHubConfiguration{ LBMode: options.LBMode, RemoteServers: us, @@ -151,15 +166,14 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { EnableDummyIf: options.EnableDummyIf, EnableIptables: options.EnableIptables, HubAgentDummyIfName: options.HubAgentDummyIfName, - WorkingMode: util.WorkingMode(options.WorkingMode), + WorkingMode: workingMode, StorageWrapper: storageWrapper, SerializerManager: serializerManager, RESTMapperManager: restMapperManager, - MutatedMasterServiceAddr: mutatedMasterServiceAddr, - Filters: filters, SharedFactory: sharedFactory, YurtSharedFactory: yurtSharedFactory, KubeletHealthGracePeriod: options.KubeletHealthGracePeriod, + FilterChain: filterChain, } return cfg, nil @@ -196,7 +210,7 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) { } // createSharedInformers create sharedInformers from the given proxyAddr. -func createSharedInformers(proxyAddr, nodePoolName string) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) { +func createSharedInformers(proxyAddr string) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) { var kubeConfig *rest.Config var err error kubeConfig, err = clientcmd.BuildConfigFromFlags(proxyAddr, "") @@ -214,20 +228,73 @@ func createSharedInformers(proxyAddr, nodePoolName string) (informers.SharedInfo return nil, nil, err } - if len(nodePoolName) == 0 { - return informers.NewSharedInformerFactory(client, 24*time.Hour), yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil + return informers.NewSharedInformerFactory(client, 24*time.Hour), + yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil +} + +// registerInformers reconstruct node/nodePool/configmap informers +func registerInformers(informerFactory informers.SharedInformerFactory, + yurtInformerFactory yurtinformers.SharedInformerFactory, + workingMode util.WorkingMode, + serviceTopologyFilterEnabled bool, + nodePoolName, nodeName string) { + // skip construct node/nodePool informers if service topology filter disabled + if serviceTopologyFilterEnabled { + if workingMode == util.WorkingModeCloud { + newNodeInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{"metadata.name": nodeName}.String() + } + return coreinformers.NewFilteredNodeInformer(client, resyncPeriod, nil, tweakListOptions) + } + informerFactory.InformerFor(&corev1.Node{}, newNodeInformer) + } + + if len(nodePoolName) != 0 { + newNodePoolInformer := func(client yurtclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{"metadata.name": nodePoolName}.String() + } + return yurtv1alpha1.NewFilteredNodePoolInformer(client, resyncPeriod, nil, tweakListOptions) + } + + yurtInformerFactory.InformerFor(&yurtcorev1alpha1.NodePool{}, newNodePoolInformer) + } + } + + if workingMode == util.WorkingModeEdge { + newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String() + } + return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions) + } + informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer) } - yurtSharedInformerFactory := yurtinformers.NewSharedInformerFactoryWithOptions(yurtClient, 24*time.Hour, - yurtinformers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.Set{"metadata.name": nodePoolName}.String() - })) - return informers.NewSharedInformerFactory(client, 24*time.Hour), yurtSharedInformerFactory, nil } // registerAllFilters by order, the front registered filter will be -// called before the later registered ones. +// called before the behind registered ones. func registerAllFilters(filters *filter.Filters) { servicetopology.Register(filters) masterservice.Register(filters) discardcloudservice.Register(filters) } + +// createFilterChain return union filters that initializations completed. +func createFilterChain(filters *filter.Filters, + sharedFactory informers.SharedInformerFactory, + yurtSharedFactory yurtinformers.SharedInformerFactory, + serializerManager *serializer.SerializerManager, + storageWrapper cachemanager.StorageWrapper, + workingMode util.WorkingMode, + nodeName, mutatedMasterServiceAddr string) (filter.Interface, error) { + if filters == nil { + return nil, nil + } + + genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, nodeName, mutatedMasterServiceAddr, workingMode) + initializerChain := filter.FilterInitializers{} + initializerChain = append(initializerChain, genericInitializer) + return filters.NewFromFilters(initializerChain) +} diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 3d69981bc47..c4c5a972cf1 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -27,8 +27,6 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/certificate" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/kubelet" - "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" "github.com/openyurtio/openyurt/pkg/yurthub/gc" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" @@ -158,15 +156,8 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { } trace++ - klog.Infof("%d. new filter chain for mutating response body", trace) - filterChain, err := createFilterChain(cfg) - if err != nil { - return fmt.Errorf("could not new filter chain, %v", err) - } - trace++ - klog.Infof("%d. new reverse proxy handler for remote servers", trace) - yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, filterChain, stopCh) + yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, stopCh) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %v", err) } @@ -183,11 +174,9 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { klog.Infof("%d. new %s server and begin to serve, dummy proxy server: %s, secure dummy proxy server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerDummyAddr, cfg.YurtHubProxyServerSecureDummyAddr) } - // start shared informers here - if filterChain != nil && cfg.Filters.Enabled(filter.ServiceTopologyFilterName) { - cfg.SharedFactory.Start(stopCh) - cfg.YurtSharedFactory.Start(stopCh) - } + // start shared informers before start hub server + cfg.SharedFactory.Start(stopCh) + cfg.YurtSharedFactory.Start(stopCh) klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr) s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler) @@ -198,14 +187,3 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { klog.Infof("hub agent exited") return nil } - -func createFilterChain(cfg *config.YurtHubConfiguration) (filter.Interface, error) { - if cfg.Filters == nil { - return nil, nil - } - - genericInitializer := initializer.New(cfg.SharedFactory, cfg.YurtSharedFactory, cfg.SerializerManager, cfg.StorageWrapper, cfg.NodeName, cfg.MutatedMasterServiceAddr) - initializerChain := filter.FilterInitializers{} - initializerChain = append(initializerChain, genericInitializer) - return cfg.Filters.NewFromFilters(initializerChain) -} diff --git a/pkg/yurthub/cachemanager/cache_agent.go b/pkg/yurthub/cachemanager/cache_agent.go index 1bbdc1925b1..dc70b046361 100644 --- a/pkg/yurthub/cachemanager/cache_agent.go +++ b/pkg/yurthub/cachemanager/cache_agent.go @@ -18,21 +18,16 @@ package cachemanager import ( "strings" - "time" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/sets" - coreinformers "k8s.io/client-go/informers/core/v1" - clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) -var ( +const ( sepForAgent = "," ) @@ -40,7 +35,7 @@ func (cm *cacheManager) initCacheAgents() error { if cm.sharedFactory == nil { return nil } - configmapInformer := cm.sharedFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer) + configmapInformer := cm.sharedFactory.Core().V1().ConfigMaps().Informer() configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: cm.addConfigmap, UpdateFunc: cm.updateConfigmap, @@ -50,8 +45,37 @@ func (cm *cacheManager) initCacheAgents() error { return nil } -// UpdateCacheAgents update cache agents -func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.String { +func (cm *cacheManager) addConfigmap(obj interface{}) { + cfg, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + + deletedAgents := cm.updateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add") + cm.deleteAgentCache(deletedAgents) +} + +func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) { + oldCfg, ok := oldObj.(*corev1.ConfigMap) + if !ok { + return + } + + newCfg, ok := newObj.(*corev1.ConfigMap) + if !ok { + return + } + + if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] { + return + } + + deletedAgents := cm.updateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update") + cm.deleteAgentCache(deletedAgents) +} + +// updateCacheAgents update cache agents +func (cm *cacheManager) updateCacheAgents(cacheAgents, action string) sets.String { newAgents := sets.NewString() for _, agent := range strings.Split(cacheAgents, sepForAgent) { agent = strings.TrimSpace(agent) @@ -63,7 +87,6 @@ func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.Strin cm.Lock() defer cm.Unlock() cm.cacheAgents = cm.cacheAgents.Delete(util.DefaultCacheAgents...) - if cm.cacheAgents.Equal(newAgents) { // add default cache agents cm.cacheAgents = cm.cacheAgents.Insert(util.DefaultCacheAgents...) @@ -84,55 +107,6 @@ func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.Strin return deletedAgents } -// ListCacheAgents get all of cache agents -func (cm *cacheManager) ListCacheAgents() []string { - cm.RLock() - defer cm.RUnlock() - agents := make([]string, 0) - for k := range cm.cacheAgents { - agents = append(agents, k) - } - return agents -} - -func newConfigmapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - selector := fields.Set{"metadata.name": util.YurthubConfigMapName}.String() - tweakListOptions := func(options *metav1.ListOptions) { - options.FieldSelector = selector - } - - return coreinformers.NewFilteredConfigMapInformer(cs, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions) -} - -func (cm *cacheManager) addConfigmap(obj interface{}) { - cfg, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } - - deletedAgents := cm.UpdateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add") - cm.deleteAgentCache(deletedAgents) -} - -func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) { - oldCfg, ok := oldObj.(*corev1.ConfigMap) - if !ok { - return - } - - newCfg, ok := newObj.(*corev1.ConfigMap) - if !ok { - return - } - - if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] { - return - } - - deletedAgents := cm.UpdateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update") - cm.deleteAgentCache(deletedAgents) -} - func (cm *cacheManager) deleteAgentCache(deletedAgents sets.String) { // delete cache data for deleted agents if deletedAgents.Len() > 0 { diff --git a/pkg/yurthub/cachemanager/cache_agent_test.go b/pkg/yurthub/cachemanager/cache_agent_test.go index ac98d9e64d7..4fc43f7801e 100644 --- a/pkg/yurthub/cachemanager/cache_agent_test.go +++ b/pkg/yurthub/cachemanager/cache_agent_test.go @@ -64,7 +64,7 @@ func TestUpdateCacheAgents(t *testing.T) { } // add agents - deletedAgents := m.UpdateCacheAgents(tt.cacheAgents, "") + deletedAgents := m.updateCacheAgents(tt.cacheAgents, "") if !deletedAgents.Equal(tt.deletedAgents) { t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents) diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 08576243eb7..a176bdfa7d7 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -52,8 +52,6 @@ import ( type CacheManager interface { CacheResponse(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) error QueryCache(req *http.Request) (runtime.Object, error) - UpdateCacheAgents(agents, action string) sets.String - ListCacheAgents() []string CanCacheFor(req *http.Request) bool DeleteKindFor(gvr schema.GroupVersionResource) error } diff --git a/pkg/yurthub/filter/initializer/initializer.go b/pkg/yurthub/filter/initializer/initializer.go index dc3d60938a7..97cb98e2bab 100644 --- a/pkg/yurthub/filter/initializer/initializer.go +++ b/pkg/yurthub/filter/initializer/initializer.go @@ -23,6 +23,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/util" ) // WantsSharedInformerFactory is an interface for setting SharedInformerFactory @@ -40,7 +41,7 @@ type WantsNodeName interface { SetNodeName(nodeName string) error } -// WantsNodeName is an interface for setting node name +// WantsSerializerManager is an interface for setting serializer manager type WantsSerializerManager interface { SetSerializerManager(s *serializer.SerializerManager) error } @@ -55,6 +56,11 @@ type WantsMasterServiceAddr interface { SetMasterServiceAddr(addr string) error } +// WantsWorkingMode is an interface for setting working mode +type WantsWorkingMode interface { + SetWorkingMode(mode util.WorkingMode) error +} + // genericFilterInitializer is responsible for initializing generic filter type genericFilterInitializer struct { factory informers.SharedInformerFactory @@ -63,6 +69,7 @@ type genericFilterInitializer struct { storageWrapper cachemanager.StorageWrapper nodeName string masterServiceAddr string + workingMode util.WorkingMode } // New creates an filterInitializer object @@ -71,7 +78,8 @@ func New(factory informers.SharedInformerFactory, sm *serializer.SerializerManager, sw cachemanager.StorageWrapper, nodeName string, - masterServiceAddr string) *genericFilterInitializer { + masterServiceAddr string, + workingMode util.WorkingMode) *genericFilterInitializer { return &genericFilterInitializer{ factory: factory, yurtFactory: yurtFactory, @@ -79,11 +87,18 @@ func New(factory informers.SharedInformerFactory, storageWrapper: sw, nodeName: nodeName, masterServiceAddr: masterServiceAddr, + workingMode: workingMode, } } // Initialize used for executing filter initialization func (fi *genericFilterInitializer) Initialize(ins filter.Interface) error { + if wants, ok := ins.(WantsWorkingMode); ok { + if err := wants.SetWorkingMode(fi.workingMode); err != nil { + return err + } + } + if wants, ok := ins.(WantsNodeName); ok { if err := wants.SetNodeName(fi.nodeName); err != nil { return err diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index a337c35f264..7c6c2dff4c7 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -38,4 +38,4 @@ type Handler interface { ObjectResponseFilter(b []byte) ([]byte, error) } -type NodeGetter func() (*v1.Node, error) +type NodeGetter func(name string) (*v1.Node, error) diff --git a/pkg/yurthub/filter/masterservice/filter.go b/pkg/yurthub/filter/masterservice/filter.go index 40d0f653c8f..a2f7cca0c7f 100644 --- a/pkg/yurthub/filter/masterservice/filter.go +++ b/pkg/yurthub/filter/masterservice/filter.go @@ -70,14 +70,6 @@ func (msf *masterServiceFilter) SetMasterServiceAddr(addr string) error { return nil } -func (msf *masterServiceFilter) Approve(comp, resource, verb string) bool { - if !msf.Approver.Approve(comp, resource, verb) { - return false - } - - return true -} - func (msf *masterServiceFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { s := filterutil.CreateSerializer(req, msf.serializerManager) if s == nil { diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index b435398e478..4a21c3788de 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -33,6 +33,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter" filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/util" ) // Register registers a filter @@ -44,8 +45,9 @@ func Register(filters *filter.Filters) { func NewFilter() *serviceTopologyFilter { return &serviceTopologyFilter{ - Approver: filter.NewApprover("kube-proxy", "endpointslices", []string{"list", "watch"}...), - stopCh: make(chan struct{}), + Approver: filter.NewApprover("kube-proxy", "endpointslices", []string{"list", "watch"}...), + workingMode: util.WorkingModeEdge, + stopCh: make(chan struct{}), } } @@ -58,14 +60,26 @@ type serviceTopologyFilter struct { nodeGetter filter.NodeGetter nodeSynced cache.InformerSynced nodeName string + workingMode util.WorkingMode serializerManager *serializer.SerializerManager stopCh chan struct{} } +func (ssf *serviceTopologyFilter) SetWorkingMode(mode util.WorkingMode) error { + ssf.workingMode = mode + return nil +} + func (ssf *serviceTopologyFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { ssf.serviceLister = factory.Core().V1().Services().Lister() ssf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced + if ssf.workingMode == util.WorkingModeCloud { + klog.Infof("prepare list/watch to sync node(%s) for cloud working mode", ssf.nodeName) + ssf.nodeSynced = factory.Core().V1().Nodes().Informer().HasSynced + ssf.nodeGetter = factory.Core().V1().Nodes().Lister().Get + } + return nil } @@ -87,6 +101,12 @@ func (ssf *serviceTopologyFilter) SetStorageWrapper(s cachemanager.StorageWrappe return fmt.Errorf("node name for serviceTopologyFilter is not ready") } + // hub agent will list/watch node from kube-apiserver when hub agent work as cloud mode + if ssf.workingMode == util.WorkingModeCloud { + return nil + } + klog.Infof("prepare local disk storage to sync node(%s) for edge working mode", ssf.nodeName) + nodeKey := fmt.Sprintf("kubelet/nodes/%s", ssf.nodeName) ssf.nodeSynced = func() bool { obj, err := s.Get(nodeKey) @@ -101,19 +121,19 @@ func (ssf *serviceTopologyFilter) SetStorageWrapper(s cachemanager.StorageWrappe return true } - ssf.nodeGetter = func() (*v1.Node, error) { - obj, err := s.Get(nodeKey) + ssf.nodeGetter = func(name string) (*v1.Node, error) { + obj, err := s.Get(fmt.Sprintf("kubelet/nodes/%s", name)) if err != nil { return nil, err } else if obj == nil { - return nil, fmt.Errorf("node(%s) is not ready", ssf.nodeName) + return nil, fmt.Errorf("node(%s) is not ready", name) } if node, ok := obj.(*v1.Node); ok { return node, nil } - return nil, fmt.Errorf("node(%s) is not found", ssf.nodeName) + return nil, fmt.Errorf("node(%s) is not found", name) } return nil diff --git a/pkg/yurthub/filter/servicetopology/handler.go b/pkg/yurthub/filter/servicetopology/handler.go index 4d5c8ac1dce..1e7d32432a4 100644 --- a/pkg/yurthub/filter/servicetopology/handler.go +++ b/pkg/yurthub/filter/servicetopology/handler.go @@ -153,7 +153,7 @@ func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *d } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone { // if type of service Topology is openyurt.io/nodepool // filter the endpoint just on the node which is in the same nodepool with current node - currentNode, err := fh.nodeGetter() + currentNode, err := fh.nodeGetter(fh.nodeName) if err != nil { klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err) return endpointSlice diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index fc4d75f1ee4..b29e5ac45b6 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -28,7 +28,6 @@ import ( "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" - "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/local" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" @@ -55,7 +54,6 @@ func NewYurtReverseProxyHandler( transportMgr transport.Interface, healthChecker healthchecker.HealthChecker, certManager interfaces.YurtCertificateManager, - filterChain filter.Interface, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), @@ -69,7 +67,7 @@ func NewYurtReverseProxyHandler( transportMgr, healthChecker, certManager, - filterChain, + yurtHubCfg.FilterChain, stopCh) if err != nil { return nil, err