From 669e004e8f52a3f3e94c895e75d307f56886bff9 Mon Sep 17 00:00:00 2001 From: rambohe Date: Mon, 19 Jun 2023 10:08:12 +0800 Subject: [PATCH] fix conflicts for getting node by local storage in yurthub filters (#1552) --- cmd/yurthub/app/config/config.go | 2 +- pkg/yurthub/filter/filter.go | 11 +- pkg/yurthub/filter/initializer/initializer.go | 36 ++-- pkg/yurthub/filter/manager/manager.go | 11 +- pkg/yurthub/filter/manager/manager_test.go | 54 +++--- .../filter/nodeportisolation/filter.go | 126 +++----------- .../filter/nodeportisolation/filter_test.go | 86 ++++------ pkg/yurthub/filter/servicetopology/filter.go | 118 +++---------- .../filter/servicetopology/filter_test.go | 161 +++++++++--------- 9 files changed, 206 insertions(+), 399 deletions(-) diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index b209a16118e..88f35cfa93a 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -139,7 +139,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { } tenantNs := util.ParseTenantNsFromOrgs(options.YurtHubCertOrganizations) registerInformers(options, sharedFactory, yurtSharedFactory, workingMode, tenantNs) - filterManager, err := manager.NewFilterManager(options, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, us[0].Host) + filterManager, err := manager.NewFilterManager(options, sharedFactory, yurtSharedFactory, proxiedClient, serializerManager, us[0].Host) if err != nil { klog.Errorf("could not create filter manager, %v", err) return nil, err diff --git a/pkg/yurthub/filter/filter.go b/pkg/yurthub/filter/filter.go index f9e0d1b09c6..9700fb4a3d6 100644 --- a/pkg/yurthub/filter/filter.go +++ b/pkg/yurthub/filter/filter.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "net/http" + "strings" "sync" "k8s.io/apimachinery/pkg/runtime" @@ -274,15 +275,11 @@ func CreateFilterChain(objFilters []ObjectFilter) ObjectFilter { } func (chain filterChain) Name() string { - var name string + var names []string for i := range chain { - if len(name) == 0 { - name = chain[i].Name() - } else { - name = "," + chain[i].Name() - } + names = append(names, chain[i].Name()) } - return name + return strings.Join(names, ",") } func (chain filterChain) SupportedResourceAndVerbs() map[string]sets.String { diff --git a/pkg/yurthub/filter/initializer/initializer.go b/pkg/yurthub/filter/initializer/initializer.go index fe120463a28..796c4a1b0d1 100644 --- a/pkg/yurthub/filter/initializer/initializer.go +++ b/pkg/yurthub/filter/initializer/initializer.go @@ -18,10 +18,9 @@ package initializer import ( "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" - "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/util" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" ) @@ -45,59 +44,46 @@ type WantsNodePoolName interface { SetNodePoolName(nodePoolName string) error } -// WantsStorageWrapper is an interface for setting StorageWrapper -type WantsStorageWrapper interface { - SetStorageWrapper(s cachemanager.StorageWrapper) error -} - // WantsMasterServiceAddr is an interface for setting mutated master service address type WantsMasterServiceAddr interface { SetMasterServiceHost(host string) error SetMasterServicePort(port string) error } -// WantsWorkingMode is an interface for setting working mode -type WantsWorkingMode interface { - SetWorkingMode(mode util.WorkingMode) error +// WantsKubeClient is an interface for setting kube client +type WantsKubeClient interface { + SetKubeClient(client kubernetes.Interface) error } // genericFilterInitializer is responsible for initializing generic filter type genericFilterInitializer struct { factory informers.SharedInformerFactory yurtFactory yurtinformers.SharedInformerFactory - storageWrapper cachemanager.StorageWrapper nodeName string nodePoolName string masterServiceHost string masterServicePort string - workingMode util.WorkingMode + client kubernetes.Interface } // New creates an filterInitializer object func New(factory informers.SharedInformerFactory, yurtFactory yurtinformers.SharedInformerFactory, - sw cachemanager.StorageWrapper, - nodeName, nodePoolName, masterServiceHost, masterServicePort string, - workingMode util.WorkingMode) *genericFilterInitializer { + kubeClient kubernetes.Interface, + nodeName, nodePoolName, masterServiceHost, masterServicePort string) *genericFilterInitializer { return &genericFilterInitializer{ factory: factory, yurtFactory: yurtFactory, - storageWrapper: sw, nodeName: nodeName, + nodePoolName: nodePoolName, masterServiceHost: masterServiceHost, masterServicePort: masterServicePort, - workingMode: workingMode, + client: kubeClient, } } // Initialize used for executing filter initialization func (fi *genericFilterInitializer) Initialize(ins filter.ObjectFilter) error { - if wants, ok := ins.(WantsWorkingMode); ok { - if err := wants.SetWorkingMode(fi.workingMode); err != nil { - return err - } - } - if wants, ok := ins.(WantsNodeName); ok { if err := wants.SetNodeName(fi.nodeName); err != nil { return err @@ -132,8 +118,8 @@ func (fi *genericFilterInitializer) Initialize(ins filter.ObjectFilter) error { } } - if wants, ok := ins.(WantsStorageWrapper); ok { - if err := wants.SetStorageWrapper(fi.storageWrapper); err != nil { + if wants, ok := ins.(WantsKubeClient); ok { + if err := wants.SetKubeClient(fi.client); err != nil { return err } } diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index a60f3bb3e1b..bc3d5f5dd91 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -23,9 +23,9 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "github.com/openyurtio/openyurt/cmd/yurthub/app/options" - "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice" "github.com/openyurtio/openyurt/pkg/yurthub/filter/inclusterconfig" @@ -47,8 +47,8 @@ type Manager struct { func NewFilterManager(options *options.YurtHubOptions, sharedFactory informers.SharedInformerFactory, yurtSharedFactory yurtinformers.SharedInformerFactory, + proxiedClient kubernetes.Interface, serializerManager *serializer.SerializerManager, - storageWrapper cachemanager.StorageWrapper, apiserverAddr string) (*Manager, error) { if !options.EnableResourceFilter { return nil, nil @@ -70,7 +70,7 @@ func NewFilterManager(options *options.YurtHubOptions, } } - objFilters, err := createObjectFilters(filters, sharedFactory, yurtSharedFactory, storageWrapper, util.WorkingMode(options.WorkingMode), options.NodeName, options.NodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort) + objFilters, err := createObjectFilters(filters, sharedFactory, yurtSharedFactory, proxiedClient, options.NodeName, options.NodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort) if err != nil { return nil, err } @@ -114,14 +114,13 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, func createObjectFilters(filters *filter.Filters, sharedFactory informers.SharedInformerFactory, yurtSharedFactory yurtinformers.SharedInformerFactory, - storageWrapper cachemanager.StorageWrapper, - workingMode util.WorkingMode, + proxiedClient kubernetes.Interface, nodeName, nodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort string) ([]filter.ObjectFilter, error) { if filters == nil { return nil, nil } - genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, storageWrapper, nodeName, nodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort, workingMode) + genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, proxiedClient, nodeName, nodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort) initializerChain := filter.Initializers{} initializerChain = append(initializerChain, genericInitializer) return filters.NewFromFilters(initializerChain) diff --git a/pkg/yurthub/filter/manager/manager_test.go b/pkg/yurthub/filter/manager/manager_test.go index 56e17923d6b..e702b7cd1b9 100644 --- a/pkg/yurthub/filter/manager/manager_test.go +++ b/pkg/yurthub/filter/manager/manager_test.go @@ -30,11 +30,9 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/cmd/yurthub/app/options" - "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" - "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" yurtfake "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned/fake" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" ) @@ -42,19 +40,8 @@ import ( func TestFindResponseFilter(t *testing.T) { fakeClient := &fake.Clientset{} fakeYurtClient := &yurtfake.Clientset{} - sharedFactory, yurtSharedFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour), - yurtinformers.NewSharedInformerFactory(fakeYurtClient, 24*time.Hour) serializerManager := serializer.NewSerializerManager() - storageManager, err := disk.NewDiskStorage("/tmp/filter_manager") - if err != nil { - t.Fatalf("could not create storage manager, %v", err) - } - storageWrapper := cachemanager.NewStorageWrapper(storageManager) apiserverAddr := "127.0.0.1:6443" - stopper := make(chan struct{}) - defer close(stopper) - sharedFactory.Start(stopper) - yurtSharedFactory.Start(stopper) testcases := map[string]struct { enableResourceFilter bool @@ -67,7 +54,7 @@ func TestFindResponseFilter(t *testing.T) { path string mgrIsNil bool isFound bool - names []string + names sets.String }{ "disable resource filter": { enableResourceFilter: false, @@ -81,9 +68,9 @@ func TestFindResponseFilter(t *testing.T) { verb: "GET", path: "/api/v1/services", isFound: true, - names: []string{"masterservice"}, + names: sets.NewString("masterservice"), }, - "get discard cloud service filter": { + "get discard cloud service and node port isolation filter": { enableResourceFilter: true, accessServerThroughHub: true, enableDummyIf: true, @@ -91,7 +78,7 @@ func TestFindResponseFilter(t *testing.T) { verb: "GET", path: "/api/v1/services", isFound: true, - names: []string{"discardcloudservice"}, + names: sets.NewString("discardcloudservice", "nodeportisolation"), }, "get service topology filter": { enableResourceFilter: true, @@ -101,7 +88,7 @@ func TestFindResponseFilter(t *testing.T) { verb: "GET", path: "/api/v1/endpoints", isFound: true, - names: []string{"servicetopology"}, + names: sets.NewString("servicetopology"), }, "disable service topology filter": { enableResourceFilter: true, @@ -120,7 +107,8 @@ func TestFindResponseFilter(t *testing.T) { userAgent: "kube-proxy", verb: "GET", path: "/api/v1/services", - isFound: false, + isFound: true, + names: sets.NewString("nodeportisolation"), }, } @@ -140,14 +128,19 @@ func TestFindResponseFilter(t *testing.T) { } options.DisabledResourceFilters = append(options.DisabledResourceFilters, tt.disabledResourceFilters...) - mgr, _ := NewFilterManager(options, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, apiserverAddr) - if tt.mgrIsNil && mgr != nil { - t.Errorf("expect manager is nil, but got not nil: %v", mgr) - } else { - // mgr is nil, complete this test case + sharedFactory, yurtSharedFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour), + yurtinformers.NewSharedInformerFactory(fakeYurtClient, 24*time.Hour) + stopper := make(chan struct{}) + defer close(stopper) + + mgr, _ := NewFilterManager(options, sharedFactory, yurtSharedFactory, fakeClient, serializerManager, apiserverAddr) + if tt.mgrIsNil && mgr == nil { return } + sharedFactory.Start(stopper) + yurtSharedFactory.Start(stopper) + req, err := http.NewRequest(tt.verb, tt.path, nil) if err != nil { t.Errorf("failed to create request, %v", err) @@ -168,20 +161,13 @@ func TestFindResponseFilter(t *testing.T) { handler = filters.WithRequestInfo(handler, resolver) handler.ServeHTTP(httptest.NewRecorder(), req) - if isFound != tt.isFound { - t.Errorf("expect isFound %v, but got %v", tt.isFound, isFound) + if !tt.isFound && isFound == tt.isFound { return } names := strings.Split(responseFilter.Name(), ",") - if len(tt.names) != len(names) { - t.Errorf("expect filter names %v, but got %v", tt.names, names) - } - - for i := range tt.names { - if tt.names[i] != names[i] { - t.Errorf("expect filter names %v, but got %v", tt.names, names) - } + if !tt.names.Equal(sets.NewString(names...)) { + t.Errorf("expect filter names %v, but got %v", tt.names.List(), names) } }) } diff --git a/pkg/yurthub/filter/nodeportisolation/filter.go b/pkg/yurthub/filter/nodeportisolation/filter.go index f7088ce3435..c4c9ff95407 100644 --- a/pkg/yurthub/filter/nodeportisolation/filter.go +++ b/pkg/yurthub/filter/nodeportisolation/filter.go @@ -17,22 +17,19 @@ limitations under the License. package nodeportisolation import ( + "context" "fmt" "strings" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/apis/apps" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/storage" - "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" - "github.com/openyurtio/openyurt/pkg/yurthub/util" - nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" ) const ( @@ -48,10 +45,8 @@ func Register(filters *filter.Filters) { type nodePortIsolationFilter struct { nodePoolName string - workingMode util.WorkingMode - nodeGetter filter.NodeGetter - nodeSynced cache.InformerSynced nodeName string + client kubernetes.Interface } func (nif *nodePortIsolationFilter) Name() string { @@ -74,92 +69,12 @@ func (nif *nodePortIsolationFilter) SetNodeName(nodeName string) error { return nil } -func (nif *nodePortIsolationFilter) SetWorkingMode(mode util.WorkingMode) error { - nif.workingMode = mode - return nil -} - -func (nif *nodePortIsolationFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { - if nif.workingMode == util.WorkingModeCloud { - klog.Infof("prepare list/watch to sync node(%s) for cloud working mode", nif.nodeName) - nif.nodeSynced = factory.Core().V1().Nodes().Informer().HasSynced - nif.nodeGetter = factory.Core().V1().Nodes().Lister().Get - } - - return nil -} - -func (nif *nodePortIsolationFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error { - if s.Name() != disk.StorageName { - return fmt.Errorf("nodePortIsolationFilter can only support disk storage currently, cannot use %s", s.Name()) - } - - if len(nif.nodeName) == 0 { - return fmt.Errorf("node name for nodePortIsolationFilter is not set") - } - - // hub agent will list/watch node from kube-apiserver when hub agent work as cloud mode - if nif.workingMode == util.WorkingModeCloud { - return nil - } - klog.Infof("prepare local disk storage to sync node(%s) for edge working mode", nif.nodeName) - - nodeKey, err := s.KeyFunc(storage.KeyBuildInfo{ - Component: "kubelet", - Name: nif.nodeName, - Resources: "nodes", - Group: "", - Version: "v1", - }) - if err != nil { - return fmt.Errorf("failed to get node key for %s, %v", nif.nodeName, err) - } - nif.nodeSynced = func() bool { - obj, err := s.Get(nodeKey) - if err != nil || obj == nil { - return false - } - - if _, ok := obj.(*v1.Node); !ok { - return false - } - - return true - } - - nif.nodeGetter = func(name string) (*v1.Node, error) { - key, err := s.KeyFunc(storage.KeyBuildInfo{ - Component: "kubelet", - Name: name, - Resources: "nodes", - Group: "", - Version: "v1", - }) - if err != nil { - return nil, fmt.Errorf("nodeGetter failed to get node key for %s, %v", name, err) - } - obj, err := s.Get(key) - if err != nil { - return nil, err - } else if obj == nil { - return nil, fmt.Errorf("node(%s) is not ready", name) - } - - if node, ok := obj.(*v1.Node); ok { - return node, nil - } - - return nil, fmt.Errorf("node(%s) is not found", name) - } - +func (nif *nodePortIsolationFilter) SetKubeClient(client kubernetes.Interface) error { + nif.client = client return nil } func (nif *nodePortIsolationFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { - if ok := cache.WaitForCacheSync(stopCh, nif.nodeSynced); !ok { - return obj - } - switch v := obj.(type) { case *v1.ServiceList: var svcNew []v1.Service @@ -179,16 +94,7 @@ func (nif *nodePortIsolationFilter) Filter(obj runtime.Object, stopCh <-chan str } func (nif *nodePortIsolationFilter) isolateNodePortService(svc *v1.Service) *v1.Service { - nodePoolName := nif.nodePoolName - if len(nodePoolName) == 0 { - node, err := nif.nodeGetter(nif.nodeName) - if err != nil { - klog.Warningf("skip isolateNodePortService filter, failed to get node(%s), %v", nif.nodeName, err) - return svc - } - nodePoolName = node.Labels[nodepoolv1alpha1.LabelCurrentNodePool] - } - + nodePoolName := nif.resolveNodePoolName() // node is not located in NodePool, keep the NodePort service the same as native K8s if len(nodePoolName) == 0 { return svc @@ -201,7 +107,7 @@ func (nif *nodePortIsolationFilter) isolateNodePortService(svc *v1.Service) *v1. if nodePoolConf.Len() != 0 && isNodePoolEnabled(nodePoolConf, nodePoolName) { return svc } else { - klog.V(2).Infof("nodePort service(%s) is disabled in nodePool(%s) by nodePortIsolationFilter", nsName, nodePoolName) + klog.V(2).Infof("service(%s) is disabled in nodePool(%s) by nodePortIsolationFilter", nsName, nodePoolName) return nil } } @@ -210,6 +116,20 @@ func (nif *nodePortIsolationFilter) isolateNodePortService(svc *v1.Service) *v1. return svc } +func (nif *nodePortIsolationFilter) resolveNodePoolName() string { + if len(nif.nodePoolName) != 0 { + return nif.nodePoolName + } + + node, err := nif.client.CoreV1().Nodes().Get(context.Background(), nif.nodeName, metav1.GetOptions{}) + if err != nil { + klog.Warningf("skip isolateNodePortService filter, failed to get node(%s), %v", nif.nodeName, err) + return nif.nodePoolName + } + nif.nodePoolName = node.Labels[apps.LabelDesiredNodePool] + return nif.nodePoolName +} + func getNodePoolConfiguration(v string) sets.String { nodePoolConf := sets.NewString() nodePoolsForValidation := sets.NewString() diff --git a/pkg/yurthub/filter/nodeportisolation/filter_test.go b/pkg/yurthub/filter/nodeportisolation/filter_test.go index 8374ab230a6..871a0e33e15 100644 --- a/pkg/yurthub/filter/nodeportisolation/filter_test.go +++ b/pkg/yurthub/filter/nodeportisolation/filter_test.go @@ -24,14 +24,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "github.com/openyurtio/openyurt/pkg/apis/apps" "github.com/openyurtio/openyurt/pkg/util" - "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" - hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" ) func TestName(t *testing.T) { @@ -81,57 +78,37 @@ func TestSetNodeName(t *testing.T) { } } -func TestSetWorkingMode(t *testing.T) { - nif := &nodePortIsolationFilter{} - if err := nif.SetWorkingMode(hubutil.WorkingMode("cloud")); err != nil { - t.Errorf("expect nil, but got %v", err) - } - - if nif.workingMode != hubutil.WorkingModeCloud { - t.Errorf("expect working mode: cloud, but got %s", nif.workingMode) - } -} - -func TestSetSharedInformerFactory(t *testing.T) { +func TestSetKubeClient(t *testing.T) { client := &fake.Clientset{} - informerFactory := informers.NewSharedInformerFactory(client, 0) - nif := &nodePortIsolationFilter{ - workingMode: "cloud", - } - if err := nif.SetSharedInformerFactory(informerFactory); err != nil { - t.Errorf("expect nil, but got %v", err) - } -} - -func TestSetStorageWrapper(t *testing.T) { - nif := &nodePortIsolationFilter{ - workingMode: "edge", - nodeName: "foo", - } - storageManager, err := disk.NewDiskStorage("/tmp/nif-filter") - if err != nil { - t.Fatalf("could not create storage manager, %v", err) - } - storageWrapper := cachemanager.NewStorageWrapper(storageManager) - - if err := nif.SetStorageWrapper(storageWrapper); err != nil { + nif := &nodePortIsolationFilter{} + if err := nif.SetKubeClient(client); err != nil { t.Errorf("expect nil, but got %v", err) } } func TestFilter(t *testing.T) { nodePoolName := "foo" - node := &corev1.Node{ + nodeFoo := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", + Annotations: map[string]string{ + apps.LabelDesiredNodePool: nodePoolName, + }, + }, + } + nodeBar := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", }, } testcases := map[string]struct { - isOrphanNodes bool - responseObj runtime.Object - expectObj runtime.Object + poolName string + nodeName string + responseObj runtime.Object + expectObj runtime.Object }{ "enable NodePort service listening on nodes in foo and bar NodePool.": { + poolName: nodePoolName, responseObj: &corev1.ServiceList{ Items: []corev1.Service{ { @@ -188,6 +165,7 @@ func TestFilter(t *testing.T) { }, }, "enable NodePort service listening on nodes of all NodePools": { + nodeName: "foo", responseObj: &corev1.ServiceList{ Items: []corev1.Service{ { @@ -244,6 +222,7 @@ func TestFilter(t *testing.T) { }, }, "disable NodePort service listening on nodes of all NodePools": { + poolName: nodePoolName, responseObj: &corev1.ServiceList{ Items: []corev1.Service{ { @@ -277,6 +256,7 @@ func TestFilter(t *testing.T) { expectObj: &corev1.ServiceList{}, }, "disable NodePort service listening only on nodes in foo NodePool": { + poolName: nodePoolName, responseObj: &corev1.ServiceList{ Items: []corev1.Service{ { @@ -320,6 +300,7 @@ func TestFilter(t *testing.T) { }, }, "disable nodeport service": { + poolName: nodePoolName, responseObj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "svc1", @@ -336,6 +317,7 @@ func TestFilter(t *testing.T) { expectObj: nil, }, "duplicated node pool configuration": { + nodeName: "foo", responseObj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "svc1", @@ -364,6 +346,7 @@ func TestFilter(t *testing.T) { }, }, "disable NodePort service listening on nodes of foo NodePool": { + poolName: nodePoolName, responseObj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "svc1", @@ -380,7 +363,7 @@ func TestFilter(t *testing.T) { expectObj: nil, }, "enable nodeport service on orphan nodes": { - isOrphanNodes: true, + nodeName: "bar", responseObj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "svc1", @@ -409,6 +392,7 @@ func TestFilter(t *testing.T) { }, }, "disable NodePort service listening if no value configured": { + poolName: nodePoolName, responseObj: &corev1.ServiceList{ Items: []corev1.Service{ { @@ -442,6 +426,7 @@ func TestFilter(t *testing.T) { expectObj: &corev1.ServiceList{}, }, "skip podList": { + poolName: nodePoolName, responseObj: &corev1.PodList{ Items: []corev1.Pod{ { @@ -486,17 +471,16 @@ func TestFilter(t *testing.T) { for k, tc := range testcases { t.Run(k, func(t *testing.T) { nif := &nodePortIsolationFilter{} - if !tc.isOrphanNodes { - nif.nodePoolName = nodePoolName - } else { - nif.nodeName = "foo" - nif.nodeGetter = func(name string) (*corev1.Node, error) { - return node, nil - } + if len(tc.poolName) != 0 { + nif.nodePoolName = tc.poolName } - nif.nodeSynced = func() bool { - return true + + if len(tc.nodeName) != 0 { + nif.nodeName = tc.nodeName + client := fake.NewSimpleClientset(nodeFoo, nodeBar) + nif.client = client } + newObj := nif.Filter(tc.responseObj, stopCh) if tc.expectObj == nil { if !util.IsNil(newObj) { diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index d25a3a8dd96..f791ad7cfdb 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -17,23 +17,21 @@ limitations under the License. package servicetopology import ( - "fmt" + "context" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" discoveryV1beta1 "k8s.io/api/discovery/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" - "github.com/openyurtio/openyurt/pkg/yurthub/storage" - "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" - "github.com/openyurtio/openyurt/pkg/yurthub/util" nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1" @@ -58,9 +56,7 @@ func Register(filters *filter.Filters) { } func NewFilter() *serviceTopologyFilter { - return &serviceTopologyFilter{ - workingMode: util.WorkingModeEdge, - } + return &serviceTopologyFilter{} } type serviceTopologyFilter struct { @@ -68,10 +64,9 @@ type serviceTopologyFilter struct { serviceSynced cache.InformerSynced nodePoolLister appslisters.NodePoolLister nodePoolSynced cache.InformerSynced - nodeGetter filter.NodeGetter - nodeSynced cache.InformerSynced + nodePoolName string nodeName string - workingMode util.WorkingMode + client kubernetes.Interface } func (stf *serviceTopologyFilter) Name() string { @@ -85,21 +80,10 @@ func (stf *serviceTopologyFilter) SupportedResourceAndVerbs() map[string]sets.St } } -func (stf *serviceTopologyFilter) SetWorkingMode(mode util.WorkingMode) error { - stf.workingMode = mode - return nil -} - func (stf *serviceTopologyFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { stf.serviceLister = factory.Core().V1().Services().Lister() stf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced - if stf.workingMode == util.WorkingModeCloud { - klog.Infof("prepare list/watch to sync node(%s) for cloud working mode", stf.nodeName) - stf.nodeSynced = factory.Core().V1().Nodes().Informer().HasSynced - stf.nodeGetter = factory.Core().V1().Nodes().Lister().Get - } - return nil } @@ -116,76 +100,32 @@ func (stf *serviceTopologyFilter) SetNodeName(nodeName string) error { return nil } -// TODO: should use disk storage as parameter instead of StorageWrapper -// we can internally construct a new StorageWrapper with passed-in disk storage -func (stf *serviceTopologyFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error { - if s.Name() != disk.StorageName { - return fmt.Errorf("serviceTopologyFilter can only support disk storage currently, cannot use %s", s.Name()) - } - - if len(stf.nodeName) == 0 { - return fmt.Errorf("node name for serviceTopologyFilter is not ready") - } - - // hub agent will list/watch node from kube-apiserver when hub agent work as cloud mode - if stf.workingMode == util.WorkingModeCloud { - return nil - } - klog.Infof("prepare local disk storage to sync node(%s) for edge working mode", stf.nodeName) - - nodeKey, err := s.KeyFunc(storage.KeyBuildInfo{ - Component: "kubelet", - Name: stf.nodeName, - Resources: "nodes", - Group: "", - Version: "v1", - }) - if err != nil { - return fmt.Errorf("failed to get node key for %s, %v", stf.nodeName, err) - } - stf.nodeSynced = func() bool { - obj, err := s.Get(nodeKey) - if err != nil || obj == nil { - return false - } +func (stf *serviceTopologyFilter) SetNodePoolName(poolName string) error { + stf.nodePoolName = poolName + return nil +} - if _, ok := obj.(*v1.Node); !ok { - return false - } +func (stf *serviceTopologyFilter) SetKubeClient(client kubernetes.Interface) error { + stf.client = client + return nil +} - return true +func (stf *serviceTopologyFilter) resolveNodePoolName() string { + if len(stf.nodePoolName) != 0 { + return stf.nodePoolName } - stf.nodeGetter = func(name string) (*v1.Node, error) { - key, err := s.KeyFunc(storage.KeyBuildInfo{ - Component: "kubelet", - Name: name, - Resources: "nodes", - Group: "", - Version: "v1", - }) - if err != nil { - return nil, fmt.Errorf("nodeGetter failed to get node key for %s, %v", name, err) - } - obj, err := s.Get(key) - if err != nil { - return nil, err - } else if obj == nil { - return nil, fmt.Errorf("node(%s) is not ready", name) - } - - if node, ok := obj.(*v1.Node); ok { - return node, nil - } - - return nil, fmt.Errorf("node(%s) is not found", name) + node, err := stf.client.CoreV1().Nodes().Get(context.Background(), stf.nodeName, metav1.GetOptions{}) + if err != nil { + klog.Warningf("failed to get node(%s) in serviceTopologyFilter filter, %v", stf.nodeName, err) + return stf.nodePoolName } - - return nil + stf.nodePoolName = node.Labels[nodepoolv1alpha1.LabelDesiredNodePool] + return stf.nodePoolName } func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { - if ok := cache.WaitForCacheSync(stopCh, stf.nodeSynced, stf.serviceSynced, stf.nodePoolSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, stf.serviceSynced, stf.nodePoolSynced); !ok { return obj } @@ -282,14 +222,8 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim } func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) runtime.Object { - currentNode, err := stf.nodeGetter(stf.nodeName) - if err != nil { - klog.Warningf("skip serviceTopologyFilterHandler, failed to get current node %s, err: %v", stf.nodeName, err) - return obj - } - - nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool] - if !ok || len(nodePoolName) == 0 { + nodePoolName := stf.resolveNodePoolName() + if len(nodePoolName) == 0 { klog.Infof("node(%s) is not added into node pool, so fall into node topology", stf.nodeName) return stf.nodeTopologyHandler(obj) } diff --git a/pkg/yurthub/filter/servicetopology/filter_test.go b/pkg/yurthub/filter/servicetopology/filter_test.go index 2f62ecab639..b86e7c5d074 100644 --- a/pkg/yurthub/filter/servicetopology/filter_test.go +++ b/pkg/yurthub/filter/servicetopology/filter_test.go @@ -17,7 +17,6 @@ limitations under the License. package servicetopology import ( - "context" "reflect" "testing" "time" @@ -69,12 +68,15 @@ func TestFilter(t *testing.T) { nodeName3 := "node3" testcases := map[string]struct { + poolName string + nodeName string responseObject runtime.Object kubeClient *k8sfake.Clientset yurtClient *yurtfake.Clientset expectObject runtime.Object }{ "v1beta1.EndpointSliceList: topologyKeys is kubernetes.io/hostname": { + poolName: "hangzhou", responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { @@ -127,7 +129,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -135,7 +137,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -143,7 +145,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -219,6 +221,7 @@ func TestFilter(t *testing.T) { }, }, "v1beta1.EndpointSliceList: topologyKeys is openyurt.io/nodepool": { + poolName: "hangzhou", responseObject: &discoveryV1beta1.EndpointSliceList{ Items: []discoveryV1beta1.EndpointSlice{ { @@ -271,7 +274,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -279,7 +282,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -287,7 +290,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -423,7 +426,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -431,7 +434,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -439,7 +442,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -575,7 +578,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -583,7 +586,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -591,7 +594,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -739,7 +742,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -747,7 +750,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -874,7 +877,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -882,7 +885,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -890,7 +893,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1000,7 +1003,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1008,7 +1011,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1016,7 +1019,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1123,7 +1126,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1131,7 +1134,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1139,7 +1142,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1272,7 +1275,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1280,7 +1283,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1288,7 +1291,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1404,7 +1407,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1412,7 +1415,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1420,7 +1423,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1542,7 +1545,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1550,7 +1553,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1558,7 +1561,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1680,7 +1683,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1688,7 +1691,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1696,7 +1699,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1828,7 +1831,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1836,7 +1839,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1939,7 +1942,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -1947,7 +1950,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -1955,7 +1958,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2045,7 +2048,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2053,7 +2056,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2061,7 +2064,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2160,7 +2163,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2168,7 +2171,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2176,7 +2179,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2290,7 +2293,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2298,7 +2301,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2306,7 +2309,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2424,7 +2427,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2432,7 +2435,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2440,7 +2443,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2558,7 +2561,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2566,7 +2569,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2574,7 +2577,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2700,7 +2703,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2708,7 +2711,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2813,7 +2816,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2821,7 +2824,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2829,7 +2832,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2921,7 +2924,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -2929,7 +2932,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -2937,7 +2940,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -3029,7 +3032,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -3037,7 +3040,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -3045,7 +3048,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -3129,7 +3132,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: currentNodeName, Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "hangzhou", + nodepoolv1alpha1.LabelDesiredNodePool: "hangzhou", }, }, }, @@ -3137,7 +3140,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -3145,7 +3148,7 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - nodepoolv1alpha1.LabelCurrentNodePool: "shanghai", + nodepoolv1alpha1.LabelDesiredNodePool: "shanghai", }, }, }, @@ -3220,24 +3223,22 @@ func TestFilter(t *testing.T) { yurtFactory.Start(stopper2) yurtFactory.WaitForCacheSync(stopper2) - nodeGetter := func(name string) (*corev1.Node, error) { - return tt.kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) - } - - nodeSynced := func() bool { - return true - } - stopCh := make(<-chan struct{}) stf := &serviceTopologyFilter{ nodeName: currentNodeName, serviceLister: serviceLister, nodePoolLister: nodePoolLister, - nodeGetter: nodeGetter, serviceSynced: serviceSynced, nodePoolSynced: nodePoolSynced, - nodeSynced: nodeSynced, + client: tt.kubeClient, } + + if len(tt.poolName) != 0 { + stf.nodePoolName = tt.poolName + } else { + stf.nodeName = currentNodeName + } + newObj := stf.Filter(tt.responseObject, stopCh) if util.IsNil(newObj) { t.Errorf("empty object is returned")