From 1df5657e162ed16ee663dedf73929025bece4a37 Mon Sep 17 00:00:00 2001 From: "bingchang.tbc" Date: Mon, 25 Mar 2024 10:18:43 +0800 Subject: [PATCH] add pool service controller. --- .../yurt-manager-auto-generated.yaml | 46 +- .../app/options/loadbalancersetcontroller.go | 64 ++ cmd/yurt-manager/app/options/options.go | 7 + .../app/options/poolservicecontroller.go | 60 -- cmd/yurt-manager/names/controller_names.go | 3 +- .../network/v1alpha1/groupversion_info.go | 4 +- .../network/v1alpha1/poolservice_types.go | 2 +- .../network/v1alpha1/zz_generated.deepcopy.go | 13 +- .../controller/apis/config/types.go | 6 +- pkg/yurtmanager/controller/controller.go | 2 + .../loadbalancerset/annotations.go | 167 +++ .../loadbalancerset}/config/types.go | 5 +- .../loadbalancerset/event_handler.go | 143 +++ .../loadbalancerset/event_handler_test.go | 211 ++++ .../loadbalancerset/finalizer.go | 46 + .../loadbalancerset/loadbalancer_status.go | 49 + .../loadbalancerset_controller.go | 507 +++++++++ .../loadbalancerset_controller_test.go | 980 ++++++++++++++++++ .../loadbalancerset/predicate.go | 255 +++++ .../loadbalancerset/predicate_test.go | 263 +++++ .../poolservice/poolservice_controller.go | 136 --- .../v1alpha1/poolservice_default.go | 39 - .../v1alpha1/poolservice_handler.go | 56 - .../v1alpha1/poolservice_validation.go | 69 -- 24 files changed, 2711 insertions(+), 422 deletions(-) create mode 100644 cmd/yurt-manager/app/options/loadbalancersetcontroller.go delete mode 100644 cmd/yurt-manager/app/options/poolservicecontroller.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/annotations.go rename pkg/yurtmanager/controller/{poolservice => loadbalancerset/loadbalancerset}/config/types.go (75%) create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler_test.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/finalizer.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancer_status.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller_test.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate.go create mode 100644 pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate_test.go delete mode 100644 pkg/yurtmanager/controller/poolservice/poolservice_controller.go delete mode 100644 pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_default.go delete mode 100644 pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_handler.go delete mode 100644 pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_validation.go diff --git a/charts/yurt-manager/templates/yurt-manager-auto-generated.yaml b/charts/yurt-manager/templates/yurt-manager-auto-generated.yaml index 1870d8b5035..3fe9f17374d 100644 --- a/charts/yurt-manager/templates/yurt-manager-auto-generated.yaml +++ b/charts/yurt-manager/templates/yurt-manager-auto-generated.yaml @@ -500,7 +500,7 @@ rules: - patch - update - apiGroups: - - net.openyurt.io + - network.openyurt.io resources: - poolservices verbs: @@ -512,7 +512,7 @@ rules: - update - watch - apiGroups: - - net.openyurt.io + - network.openyurt.io resources: - poolservices/status verbs: @@ -756,27 +756,6 @@ webhooks: resources: - yurtstaticsets sideEffects: None -- admissionReviewVersions: - - v1 - - v1beta1 - clientConfig: - service: - name: yurt-manager-webhook-service - namespace: {{ .Release.Namespace }} - path: /mutate-net-openyurt-io-poolservice - failurePolicy: Fail - name: mutate.net.v1alpha1.poolservice.openyurt.io - rules: - - apiGroups: - - net.openyurt.io - apiVersions: - - v1alpha1 - operations: - - CREATE - - UPDATE - resources: - - poolservices - sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration @@ -951,24 +930,3 @@ webhooks: resources: - yurtstaticsets sideEffects: None -- admissionReviewVersions: - - v1 - - v1beta1 - clientConfig: - service: - name: yurt-manager-webhook-service - namespace: {{ .Release.Namespace }} - path: /validate-net-openyurt-io-poolservice - failurePolicy: Fail - name: validate.net.v1alpha1.poolservice.openyurt.io - rules: - - apiGroups: - - net.openyurt.io - apiVersions: - - v1alpha1 - operations: - - CREATE - - UPDATE - resources: - - poolservices - sideEffects: None diff --git a/cmd/yurt-manager/app/options/loadbalancersetcontroller.go b/cmd/yurt-manager/app/options/loadbalancersetcontroller.go new file mode 100644 index 00000000000..b07c2bdfa02 --- /dev/null +++ b/cmd/yurt-manager/app/options/loadbalancersetcontroller.go @@ -0,0 +1,64 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "github.com/spf13/pflag" + + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/config" +) + +type LoadBalancerSetControllerOptions struct { + *config.LoadBalancerSetControllerConfiguration +} + +func NewLoadBalancerSetControllerOptions() *LoadBalancerSetControllerOptions { + return &LoadBalancerSetControllerOptions{ + &config.LoadBalancerSetControllerConfiguration{ + ConcurrentLoadBalancerSetWorkers: 3, + }, + } +} + +// AddFlags adds flags related to poolservice for yurt-manager to the specified FlagSet. +func (n *LoadBalancerSetControllerOptions) AddFlags(fs *pflag.FlagSet) { + if n == nil { + return + } + + fs.Int32Var(&n.ConcurrentLoadBalancerSetWorkers, "concurrent-load-balancer-set-workers", n.ConcurrentLoadBalancerSetWorkers, "The number of load-balancer-set service objects that are allowed to reconcile concurrently. Larger number = more responsive load-balancer-set services, but more CPU (and network) load") +} + +// ApplyTo fills up poolservice config with options. +func (o *LoadBalancerSetControllerOptions) ApplyTo(cfg *config.LoadBalancerSetControllerConfiguration) error { + if o == nil { + return nil + } + + cfg.ConcurrentLoadBalancerSetWorkers = o.ConcurrentLoadBalancerSetWorkers + + return nil +} + +// Validate checks validation of LoadBalancerSetControllerOptions. +func (o *LoadBalancerSetControllerOptions) Validate() []error { + if o == nil { + return nil + } + errs := []error{} + return errs +} diff --git a/cmd/yurt-manager/app/options/options.go b/cmd/yurt-manager/app/options/options.go index 1341086fc33..932fe76e030 100644 --- a/cmd/yurt-manager/app/options/options.go +++ b/cmd/yurt-manager/app/options/options.go @@ -39,6 +39,7 @@ type YurtManagerOptions struct { YurtAppOverriderController *YurtAppOverriderControllerOptions NodeLifeCycleController *NodeLifecycleControllerOptions NodeBucketController *NodeBucketControllerOptions + LoadBalancerSetController *LoadBalancerSetControllerOptions } // NewYurtManagerOptions creates a new YurtManagerOptions with a default config. @@ -59,6 +60,7 @@ func NewYurtManagerOptions() (*YurtManagerOptions, error) { YurtAppOverriderController: NewYurtAppOverriderControllerOptions(), NodeLifeCycleController: NewNodeLifecycleControllerOptions(), NodeBucketController: NewNodeBucketControllerOptions(), + LoadBalancerSetController: NewLoadBalancerSetControllerOptions(), } return &s, nil @@ -79,6 +81,7 @@ func (y *YurtManagerOptions) Flags(allControllers, disabledByDefaultControllers y.YurtAppOverriderController.AddFlags(fss.FlagSet("yurtappoverrider controller")) y.NodeLifeCycleController.AddFlags(fss.FlagSet("nodelifecycle controller")) y.NodeBucketController.AddFlags(fss.FlagSet("nodebucket controller")) + y.LoadBalancerSetController.AddFlags(fss.FlagSet("loadbalancerset controller")) return fss } @@ -98,6 +101,7 @@ func (y *YurtManagerOptions) Validate(allControllers []string, controllerAliases errs = append(errs, y.YurtAppOverriderController.Validate()...) errs = append(errs, y.NodeLifeCycleController.Validate()...) errs = append(errs, y.NodeBucketController.Validate()...) + errs = append(errs, y.LoadBalancerSetController.Validate()...) return utilerrors.NewAggregate(errs) } @@ -142,6 +146,9 @@ func (y *YurtManagerOptions) ApplyTo(c *config.Config, controllerAliases map[str if err := y.NodeBucketController.ApplyTo(&c.ComponentConfig.NodeBucketController); err != nil { return err } + if err := y.LoadBalancerSetController.ApplyTo(&c.ComponentConfig.LoadBalancerSetController); err != nil { + return err + } return nil } diff --git a/cmd/yurt-manager/app/options/poolservicecontroller.go b/cmd/yurt-manager/app/options/poolservicecontroller.go deleted file mode 100644 index d5b8c544050..00000000000 --- a/cmd/yurt-manager/app/options/poolservicecontroller.go +++ /dev/null @@ -1,60 +0,0 @@ -/* -Copyright 2023 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the License); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an AS IS BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package options - -import ( - "github.com/spf13/pflag" - - "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/poolservice/config" -) - -type PoolServiceControllerOptions struct { - *config.PoolServiceControllerConfiguration -} - -func NewPoolServiceControllerOptions() *PoolServiceControllerOptions { - return &PoolServiceControllerOptions{ - &config.PoolServiceControllerConfiguration{}, - } -} - -// AddFlags adds flags related to poolservice for yurt-manager to the specified FlagSet. -func (n *PoolServiceControllerOptions) AddFlags(fs *pflag.FlagSet) { - if n == nil { - return - } - - //fs.BoolVar(&n.CreateDefaultPool, "create-default-pool", n.CreateDefaultPool, "Create default cloud/edge pools if indicated.") -} - -// ApplyTo fills up poolservice config with options. -func (o *PoolServiceControllerOptions) ApplyTo(cfg *config.PoolServiceControllerConfiguration) error { - if o == nil { - return nil - } - - return nil -} - -// Validate checks validation of PoolServiceControllerOptions. -func (o *PoolServiceControllerOptions) Validate() []error { - if o == nil { - return nil - } - errs := []error{} - return errs -} diff --git a/cmd/yurt-manager/names/controller_names.go b/cmd/yurt-manager/names/controller_names.go index 5b2e3b8785c..4001aa0cee1 100644 --- a/cmd/yurt-manager/names/controller_names.go +++ b/cmd/yurt-manager/names/controller_names.go @@ -36,7 +36,7 @@ const ( GatewayDNSController = "gateway-dns-controller" NodeLifeCycleController = "node-life-cycle-controller" NodeBucketController = "node-bucket-controller" - PoolServiceController = "pool-service-controller" + LoadBalancerSetController = "load-balancer-set-controller" ) func YurtManagerControllerAliases() map[string]string { @@ -61,5 +61,6 @@ func YurtManagerControllerAliases() map[string]string { "gatewaydns": GatewayDNSController, "nodelifecycle": NodeLifeCycleController, "nodebucket": NodeBucketController, + "loadbalancerset": LoadBalancerSetController, } } diff --git a/pkg/apis/network/v1alpha1/groupversion_info.go b/pkg/apis/network/v1alpha1/groupversion_info.go index 8e9f14f65b8..8d6462f2c86 100644 --- a/pkg/apis/network/v1alpha1/groupversion_info.go +++ b/pkg/apis/network/v1alpha1/groupversion_info.go @@ -16,9 +16,9 @@ limitations under the License. package v1alpha1 -// Package v1alpha1 contains API Schema definitions for the net v1alpha1API group +// Package v1alpha1 contains API Schema definitions for the network v1alpha1API group // +kubebuilder:object:generate=true -// +groupName=net.openyurt.io +// +groupName=network.openyurt.io import ( "k8s.io/apimachinery/pkg/runtime/schema" diff --git a/pkg/apis/network/v1alpha1/poolservice_types.go b/pkg/apis/network/v1alpha1/poolservice_types.go index 417751d3876..6a7be0e0ce7 100644 --- a/pkg/apis/network/v1alpha1/poolservice_types.go +++ b/pkg/apis/network/v1alpha1/poolservice_types.go @@ -33,7 +33,7 @@ type PoolServiceSpec struct { // PoolServiceStatus defines the observed state of PoolService type PoolServiceStatus struct { // LoadBalancer contains the current status of the load-balancer in the current nodepool - LoadBalancer *v1.LoadBalancerStatus `json:"loadBalancer,omitempty"` + LoadBalancer v1.LoadBalancerStatus `json:"loadBalancer,omitempty"` // Current poolService state Conditions []metav1.Condition `json:"conditions,omitempty"` diff --git a/pkg/apis/network/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/network/v1alpha1/zz_generated.deepcopy.go index 7f4d8450bc2..1041785c993 100644 --- a/pkg/apis/network/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/network/v1alpha1/zz_generated.deepcopy.go @@ -2,7 +2,7 @@ // +build !ignore_autogenerated /* -Copyright 2024 The OpenYurt Authors. +Copyright 2023 The OpenYurt Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,8 +22,7 @@ limitations under the License. package v1alpha1 import ( - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -109,14 +108,10 @@ func (in *PoolServiceSpec) DeepCopy() *PoolServiceSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PoolServiceStatus) DeepCopyInto(out *PoolServiceStatus) { *out = *in - if in.LoadBalancer != nil { - in, out := &in.LoadBalancer, &out.LoadBalancer - *out = new(v1.LoadBalancerStatus) - (*in).DeepCopyInto(*out) - } + in.LoadBalancer.DeepCopyInto(&out.LoadBalancer) if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]metav1.Condition, len(*in)) + *out = make([]v1.Condition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } diff --git a/pkg/yurtmanager/controller/apis/config/types.go b/pkg/yurtmanager/controller/apis/config/types.go index 6f2c82651cc..18f7642c006 100644 --- a/pkg/yurtmanager/controller/apis/config/types.go +++ b/pkg/yurtmanager/controller/apis/config/types.go @@ -23,10 +23,10 @@ import ( csrapproverconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/csrapprover/config" daemonpodupdaterconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/daemonpodupdater/config" + loadbalancersetconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/config" nodebucketconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodebucket/config" nodepoolconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodepool/config" platformadminconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/platformadmin/config" - poolserviceconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/poolservice/config" gatewaypickupconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/gatewaypickup/config" yurtappdaemonconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/yurtappdaemon/config" yurtappoverriderconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/yurtappoverrider/config" @@ -79,8 +79,8 @@ type YurtManagerConfiguration struct { // NodeBucketController holds configuration for NodeBucketController related features. NodeBucketController nodebucketconfig.NodeBucketControllerConfiguration - // PoolServiceController holds configuration for PoolServiceController related features. - PoolServiceController poolserviceconfig.PoolServiceControllerConfiguration + // LoadBalancerSetController holds configuration for LoadBalancerSetController related features. + LoadBalancerSetController loadbalancersetconfig.LoadBalancerSetControllerConfiguration } type GenericConfiguration struct { diff --git a/pkg/yurtmanager/controller/controller.go b/pkg/yurtmanager/controller/controller.go index d1fdefbb945..a6cb06f999b 100644 --- a/pkg/yurtmanager/controller/controller.go +++ b/pkg/yurtmanager/controller/controller.go @@ -32,6 +32,7 @@ import ( "github.com/openyurtio/openyurt/cmd/yurt-manager/names" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/csrapprover" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodebucket" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodelifecycle" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodepool" @@ -97,6 +98,7 @@ func NewControllerInitializers() map[string]InitFunc { register(names.GatewayPublicServiceController, gatewaypublicservice.Add) register(names.NodeLifeCycleController, nodelifecycle.Add) register(names.NodeBucketController, nodebucket.Add) + register(names.LoadBalancerSetController, loadbalancerset.Add) return controllers } diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/annotations.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/annotations.go new file mode 100644 index 00000000000..8dcd8f5569a --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/annotations.go @@ -0,0 +1,167 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "reflect" + "sort" + "strings" + + corev1 "k8s.io/api/core/v1" + + "github.com/openyurtio/openyurt/pkg/apis/network" + netv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" +) + +func aggregatePoolServicesAnnotations(poolServices []netv1alpha1.PoolService) map[string]string { + aggregatedAnnotations := make(map[string]string) + for _, ps := range poolServices { + aggregatedAnnotations = mergeAnnotations(aggregatedAnnotations, filterIgnoredKeys(ps.Annotations)) + } + + return aggregatedAnnotations +} + +func filterIgnoredKeys(annotations map[string]string) map[string]string { + newAnnotations := make(map[string]string) + for key, value := range annotations { + if key == network.AnnotationNodePoolSelector { + continue + } + if !strings.HasPrefix(key, network.AggregateAnnotationsKeyPrefix) { + continue + } + newAnnotations[key] = value + } + return newAnnotations +} + +func mergeAnnotations(m map[string]string, elem map[string]string) map[string]string { + if len(elem) == 0 { + return m + } + + if m == nil { + m = make(map[string]string) + } + + for k, v := range elem { + m[k] = mergeAnnotationValue(m[k], v) + } + + return m +} + +func mergeAnnotationValue(originalValue, addValue string) string { + if len(originalValue) == 0 { + return addValue + } + + if len(addValue) == 0 { + return originalValue + } + + splitOriginalValues := strings.Split(originalValue, ",") + if valueIsExist(splitOriginalValues, addValue) { + return originalValue + } + + return joinNewValue(splitOriginalValues, addValue) +} + +func valueIsExist(originalValueList []string, addValue string) bool { + for _, oldValue := range originalValueList { + if addValue == oldValue { + return true + } + } + return false +} + +func joinNewValue(originalValueList []string, addValue string) string { + originalValueList = append(originalValueList, addValue) + sort.Strings(originalValueList) + + return strings.Join(originalValueList, ",") +} + +func compareAndUpdateServiceAnnotations(svc *corev1.Service, aggregatedAnnotations map[string]string) bool { + currentAggregatedServiceAnnotations := filterIgnoredKeys(svc.Annotations) + + if reflect.DeepEqual(currentAggregatedServiceAnnotations, aggregatedAnnotations) { + return false + } + + update, deletion := diffAnnotations(currentAggregatedServiceAnnotations, aggregatedAnnotations) + updateAnnotations(svc.Annotations, update, deletion) + + return true +} + +func diffAnnotations(currentAnnotations, desiredAnnotations map[string]string) (update map[string]string, deletion map[string]string) { + if currentAnnotations == nil { + return desiredAnnotations, nil + } + if desiredAnnotations == nil { + return nil, currentAnnotations + } + + update = make(map[string]string) + for key, value := range desiredAnnotations { + if currentAnnotations[key] != value { + update[key] = value + } + } + + deletion = make(map[string]string) + for key, value := range currentAnnotations { + if _, exist := desiredAnnotations[key]; !exist { + deletion[key] = value + } + } + return +} + +func updateAnnotations(annotations, update, deletion map[string]string) { + if len(update) == 0 && len(deletion) == 0 { + return + } + if annotations == nil { + annotations = make(map[string]string) + } + for key, value := range update { + annotations[key] = value + } + + for key := range deletion { + delete(annotations, key) + } +} + +func annotationValueIsEqual(oldAnnotations, newAnnotations map[string]string, key string) bool { + var oldValue string + if oldAnnotations != nil { + oldValue = oldAnnotations[key] + } + + var newValue string + if newAnnotations != nil { + newValue = newAnnotations[key] + } + + return oldValue == newValue +} diff --git a/pkg/yurtmanager/controller/poolservice/config/types.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/config/types.go similarity index 75% rename from pkg/yurtmanager/controller/poolservice/config/types.go rename to pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/config/types.go index e22bc13a5c8..bd5c0c0a9e6 100644 --- a/pkg/yurtmanager/controller/poolservice/config/types.go +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/config/types.go @@ -16,6 +16,7 @@ limitations under the License. package config -// PoolServiceControllerConfiguration contains elements describing PoolServiceController. -type PoolServiceControllerConfiguration struct { +// LoadBalancerSetControllerConfiguration contains elements describing LoadBalancerSetController. +type LoadBalancerSetControllerConfiguration struct { + ConcurrentLoadBalancerSetWorkers int32 } diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler.go new file mode 100644 index 00000000000..532128b284d --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler.go @@ -0,0 +1,143 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "context" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/openyurtio/openyurt/cmd/yurt-manager/names" + "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1" + "github.com/openyurtio/openyurt/pkg/apis/network" + "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" +) + +func NewPoolServiceEventHandler() handler.EventHandler { + return handler.Funcs{ + CreateFunc: func(event event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) { + handlePoolServiceNormal(event.Object, limitingInterface) + }, + UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) { + handlePoolServiceUpdate(updateEvent.ObjectOld, updateEvent.ObjectNew, limitingInterface) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { + handlePoolServiceNormal(deleteEvent.Object, limitingInterface) + }, + GenericFunc: func(genericEvent event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) { + handlePoolServiceNormal(genericEvent.Object, limitingInterface) + }, + } +} + +func handlePoolServiceNormal(event client.Object, q workqueue.RateLimitingInterface) { + ps := event.(*v1alpha1.PoolService) + serviceName := getServiceNameFromPoolService(ps) + enqueueService(ps.Namespace, serviceName, q) +} + +func getServiceNameFromPoolService(poolService *v1alpha1.PoolService) string { + if poolService.Labels == nil { + return "" + } + + return poolService.Labels[network.LabelServiceName] +} + +func enqueueService(namespace, serviceName string, q workqueue.RateLimitingInterface) { + if len(serviceName) == 0 || len(namespace) == 0 { + return + } + + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{Namespace: namespace, Name: serviceName}, + }) +} + +func handlePoolServiceUpdate(oldObject, newObject client.Object, q workqueue.RateLimitingInterface) { + oldPs := oldObject.(*v1alpha1.PoolService) + newPs := newObject.(*v1alpha1.PoolService) + + oldServiceName := getServiceNameFromPoolService(oldPs) + newServiceName := getServiceNameFromPoolService(newPs) + + if oldServiceName != newServiceName { + klog.Warningf("service name of %s/%s is changed from %s to %s", oldPs.Namespace, oldPs.Name, oldServiceName, newServiceName) + enqueueService(oldPs.Namespace, oldServiceName, q) + enqueueService(newPs.Namespace, newServiceName, q) + return + } + enqueueService(newPs.Namespace, newServiceName, q) + + return +} + +func NewNodePoolEventHandler(c client.Client) handler.EventHandler { + return handler.Funcs{ + CreateFunc: func(createEvent event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) { + allLoadBalancerSetServicesEnqueue(c, limitingInterface) + }, + UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) { + allLoadBalancerSetServicesEnqueue(c, limitingInterface) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { + nodePoolRelatedServiceEnqueue(c, deleteEvent.Object, limitingInterface) + }, + GenericFunc: func(genericEvent event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) { + nodePoolRelatedServiceEnqueue(c, genericEvent.Object, limitingInterface) + }, + } +} + +func allLoadBalancerSetServicesEnqueue(c client.Client, q workqueue.RateLimitingInterface) { + services := &v1.ServiceList{} + err := c.List(context.Background(), services) + if err != nil { + return + } + + for _, svc := range services.Items { + if !isLoadBalancerSetService(&svc) { + continue + } + enqueueService(svc.Namespace, svc.Name, q) + } +} + +func nodePoolRelatedServiceEnqueue(c client.Client, object client.Object, q workqueue.RateLimitingInterface) { + np := object.(*v1beta1.NodePool) + poolServiceList := &v1alpha1.PoolServiceList{} + + listSelector := client.MatchingLabels{ + network.LabelNodePoolName: np.Name, + labelManageBy: names.LoadBalancerSetController} + + if err := c.List(context.Background(), poolServiceList, listSelector); err != nil { + return + } + + for _, item := range poolServiceList.Items { + handlePoolServiceNormal(&item, q) + } +} diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler_test.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler_test.go new file mode 100644 index 00000000000..c383adbc532 --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/event_handler_test.go @@ -0,0 +1,211 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/openyurtio/openyurt/cmd/yurt-manager/names" + "github.com/openyurtio/openyurt/pkg/apis/network" + "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" +) + +func TestPoolServiceEventHandler(t *testing.T) { + f := NewPoolServiceEventHandler() + t.Run("create pool service", func(t *testing.T) { + ps := newPoolServiceWithServiceNameAndNodepoolName(mockServiceName, "np123") + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Create(event.CreateEvent{ + Object: ps, + }, q) + + assertAndDoneQueue(t, q, []string{v1.NamespaceDefault + "/" + mockServiceName}) + }) + + t.Run("create pool service not service name", func(t *testing.T) { + ps := newPoolServiceWithServiceNameAndNodepoolName(mockServiceName, "np123") + delete(ps.Labels, network.LabelServiceName) + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Create(event.CreateEvent{ + Object: ps, + }, q) + assertAndDoneQueue(t, q, []string{}) + }) + + t.Run("update service name with pool service", func(t *testing.T) { + oldPs := newPoolServiceWithServiceNameAndNodepoolName("mock1", "np123") + newPs := newPoolServiceWithServiceNameAndNodepoolName("mock2", "np123") + + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Update(event.UpdateEvent{ObjectOld: oldPs, ObjectNew: newPs}, q) + + assertAndDoneQueue(t, q, []string{ + v1.NamespaceDefault + "/" + "mock1", + v1.NamespaceDefault + "/" + "mock2", + }) + }) + + t.Run("delete service name with pool service", func(t *testing.T) { + oldPs := newPoolServiceWithServiceNameAndNodepoolName("mock1", "np123") + newPs := newPoolServiceWithServiceNameAndNodepoolName("mock1", "np123") + delete(newPs.Labels, network.LabelServiceName) + + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Update(event.UpdateEvent{ObjectOld: oldPs, ObjectNew: newPs}, q) + + assertAndDoneQueue(t, q, []string{v1.NamespaceDefault + "/" + "mock1"}) + }) + + t.Run("update pool service annotations", func(t *testing.T) { + oldPs := newPoolServiceWithServiceNameAndNodepoolName(mockServiceName, "np123") + newPs := newPoolServiceWithServiceNameAndNodepoolName(mockServiceName, "np123") + newPs.Annotations = map[string]string{"test": "app"} + + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Update(event.UpdateEvent{ + ObjectOld: oldPs, + ObjectNew: newPs, + }, q) + + assertAndDoneQueue(t, q, []string{v1.NamespaceDefault + "/" + mockServiceName}) + }) + +} + +func newPoolServiceWithServiceNameAndNodepoolName(serviceName string, poolName string) *v1alpha1.PoolService { + return &v1alpha1.PoolService{ + TypeMeta: v1.TypeMeta{ + Kind: "PoolService", + APIVersion: v1alpha1.GroupVersion.String(), + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: v1.NamespaceDefault, + Name: serviceName + "-" + poolName, + Labels: map[string]string{network.LabelServiceName: serviceName, network.LabelNodePoolName: poolName, labelManageBy: names.LoadBalancerSetController}, + }, + Spec: v1alpha1.PoolServiceSpec{ + LoadBalancerClass: &elbClass, + }, + } +} + +func assertAndDoneQueue(t testing.TB, q workqueue.Interface, expectedItemNames []string) { + t.Helper() + + if q.Len() != len(expectedItemNames) { + t.Errorf("expected queue %d, but got %d", len(expectedItemNames), q.Len()) + return + } + + for _, expectedItem := range expectedItemNames { + gotItem, _ := q.Get() + r, ok := gotItem.(reconcile.Request) + + if !ok { + t.Errorf("expected item is reconcile request, but not") + } + + if r.String() != expectedItem { + t.Errorf("expected request is %s, but got %s", expectedItem, r.String()) + } + q.Done(gotItem) + } +} + +func TestNodePoolEventHandler(t *testing.T) { + scheme := initScheme(t) + + t.Run("create/update nodepool enqueue all multi lb service", func(t *testing.T) { + np := newNodepool("np123", "name=np123") + + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc2 := newService(v1.NamespaceDefault, "mock") + svc2.Spec.Type = corev1.ServiceTypeClusterIP + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc1).WithObjects(svc2).Build() + f := NewNodePoolEventHandler(c) + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Create(event.CreateEvent{Object: np}, q) + assertAndDoneQueue(t, q, []string{v1.NamespaceDefault + "/" + mockServiceName}) + + f.Update(event.UpdateEvent{ObjectOld: np, ObjectNew: np}, q) + assertAndDoneQueue(t, q, []string{v1.NamespaceDefault + "/" + mockServiceName}) + }) + + t.Run("delete/generic need enqueue", func(t *testing.T) { + ps1 := newPoolServiceWithServiceNameAndNodepoolName("mock1", "np123") + ps2 := newPoolServiceWithServiceNameAndNodepoolName("mock2", "np123") + ps3 := newPoolServiceWithServiceNameAndNodepoolName("mock3", "np123") + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(ps1).WithObjects(ps2).WithObjects(ps3).Build() + + f := NewNodePoolEventHandler(c) + + np := newNodepool("np123", "name=np123,app=deploy") + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Delete(event.DeleteEvent{Object: np}, q) + assertAndDoneQueue(t, q, []string{ + v1.NamespaceDefault + "/" + "mock1", + v1.NamespaceDefault + "/" + "mock2", + v1.NamespaceDefault + "/" + "mock3", + }) + + f.Generic(event.GenericEvent{Object: np}, q) + assertAndDoneQueue(t, q, []string{ + v1.NamespaceDefault + "/" + "mock1", + v1.NamespaceDefault + "/" + "mock2", + v1.NamespaceDefault + "/" + "mock3", + }) + }) + + t.Run("update/delete/generic not enqueue", func(t *testing.T) { + ps := newPoolServiceWithServiceNameAndNodepoolName(mockServiceName, "np123") + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(ps).Build() + + f := NewNodePoolEventHandler(c) + + np := newNodepool("np234", "name=np234,app=deploy") + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pool_services") + + f.Create(event.CreateEvent{Object: np}, q) + assertAndDoneQueue(t, q, []string{}) + + f.Update(event.UpdateEvent{ObjectOld: np, ObjectNew: np}, q) + assertAndDoneQueue(t, q, []string{}) + + f.Delete(event.DeleteEvent{Object: np}, q) + assertAndDoneQueue(t, q, []string{}) + + f.Generic(event.GenericEvent{Object: np}, q) + assertAndDoneQueue(t, q, []string{}) + }) + +} diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/finalizer.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/finalizer.go new file mode 100644 index 00000000000..585e5184dd5 --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/finalizer.go @@ -0,0 +1,46 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + poolServiceFinalizer = "poolservice.openyurt.io/resources" +) + +func (r *ReconcileLoadBalancerSet) addFinalizer(svc *corev1.Service) error { + if controllerutil.ContainsFinalizer(svc, poolServiceFinalizer) { + return nil + } + + controllerutil.AddFinalizer(svc, poolServiceFinalizer) + return r.Update(context.Background(), svc) +} + +func (r *ReconcileLoadBalancerSet) removeFinalizer(svc *corev1.Service) error { + if !controllerutil.ContainsFinalizer(svc, poolServiceFinalizer) { + return nil + } + + controllerutil.RemoveFinalizer(svc, poolServiceFinalizer) + return r.Update(context.Background(), svc) +} diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancer_status.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancer_status.go new file mode 100644 index 00000000000..c766611d839 --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancer_status.go @@ -0,0 +1,49 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "reflect" + "sort" + + corev1 "k8s.io/api/core/v1" + + netv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" +) + +func aggregateLbStatus(poolServices []netv1alpha1.PoolService) corev1.LoadBalancerStatus { + var lbIngress []corev1.LoadBalancerIngress + for _, cps := range poolServices { + lbIngress = append(lbIngress, cps.Status.LoadBalancer.Ingress...) + } + + sort.Slice(lbIngress, func(i, j int) bool { + return lbIngress[i].IP < lbIngress[j].IP + }) + + return corev1.LoadBalancerStatus{ + Ingress: lbIngress, + } +} + +func compareAndUpdateServiceLbStatus(svc *corev1.Service, lbStatus corev1.LoadBalancerStatus) bool { + if !reflect.DeepEqual(svc.Status.LoadBalancer, lbStatus) { + svc.Status.LoadBalancer = lbStatus + return true + } + return false +} diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller.go new file mode 100644 index 00000000000..b9813d412d5 --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller.go @@ -0,0 +1,507 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "context" + "fmt" + "reflect" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + appconfig "github.com/openyurtio/openyurt/cmd/yurt-manager/app/config" + "github.com/openyurtio/openyurt/cmd/yurt-manager/names" + "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1" + "github.com/openyurtio/openyurt/pkg/apis/network" + netv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/config" +) + +const ( + labelManageBy = "poolservice.openyurt.io/managed-by" + + poolServiceModifiedEventMsgFormat = "PoolService %s/%s resource is manually modified, the controller will overwrite this modification" + poolServiceManagedConflictEventMsgFormat = "PoolService %s/%s is not managed by pool-service-controller, but the nodepool-labelselector of service %s/%s include it" +) + +var ( + poolServicesControllerResource = netv1alpha1.SchemeGroupVersion.WithResource("poolservices") + nodepoolsControllerResource = v1beta1.SchemeGroupVersion.WithResource("nodepools") +) + +func Format(format string, args ...interface{}) string { + s := fmt.Sprintf(format, args...) + return fmt.Sprintf("%s: %s", names.LoadBalancerSetController, s) +} + +// Add creates a new PoolService Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func Add(ctx context.Context, c *appconfig.CompletedConfig, mgr manager.Manager) error { + klog.Infof(Format("loadbalancerset-controller add controller %s", poolServicesControllerResource.String())) + r := newReconciler(c, mgr) + + if _, err := r.mapper.KindFor(poolServicesControllerResource); err != nil { + return errors.Errorf("resource %s isn't exist", poolServicesControllerResource.String()) + } + if _, err := r.mapper.KindFor(nodepoolsControllerResource); err != nil { + return errors.Errorf("resource %s isn't exist", nodepoolsControllerResource.String()) + } + + return add(mgr, c, r) +} + +var _ reconcile.Reconciler = &ReconcileLoadBalancerSet{} + +// ReconcileLoadBalancerSet reconciles a PoolService object +type ReconcileLoadBalancerSet struct { + client.Client + scheme *runtime.Scheme + recorder record.EventRecorder + mapper meta.RESTMapper + + configration config.LoadBalancerSetControllerConfiguration +} + +// newReconciler returns a new reconcile.Reconciler +func newReconciler(c *appconfig.CompletedConfig, mgr manager.Manager) *ReconcileLoadBalancerSet { + return &ReconcileLoadBalancerSet{ + Client: mgr.GetClient(), + scheme: mgr.GetScheme(), + mapper: mgr.GetRESTMapper(), + recorder: mgr.GetEventRecorderFor(names.LoadBalancerSetController), + configration: c.ComponentConfig.LoadBalancerSetController, + } +} + +// add adds a new Controller to mgr with r as the reconcile.Reconciler +func add(mgr manager.Manager, cfg *appconfig.CompletedConfig, r reconcile.Reconciler) error { + // Create a new controller + c, err := controller.New(names.LoadBalancerSetController, mgr, controller.Options{ + Reconciler: r, MaxConcurrentReconciles: int(cfg.ComponentConfig.LoadBalancerSetController.ConcurrentLoadBalancerSetWorkers), + }) + if err != nil { + return err + } + + // Watch for changes to PoolService + err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForObject{}, NewServicePredicated()) + if err != nil { + return err + } + + err = c.Watch(&source.Kind{Type: &netv1alpha1.PoolService{}}, NewPoolServiceEventHandler(), NewPoolServicePredicated()) + if err != nil { + return err + } + + err = c.Watch(&source.Kind{Type: &v1beta1.NodePool{}}, NewNodePoolEventHandler(mgr.GetClient()), NewNodePoolPredicated()) + if err != nil { + return err + } + + return nil +} + +// +kubebuilder:rbac:groups=network.openyurt.io,resources=poolservices,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=network.openyurt.io,resources=poolservices/status,verbs=get;update;patch + +// Reconcile reads that state of the cluster for a PoolService object and makes changes based on the state read +// and what is in the PoolService.Spec +func (r *ReconcileLoadBalancerSet) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) { + + // Note !!!!!!!!!! + // We strongly recommend use Format() to encapsulation because Format() can print logs by module + // @kadisi + klog.Infof(Format("Reconcile PoolService %s/%s", request.Namespace, request.Name)) + + service := &corev1.Service{} + err := r.Get(context.TODO(), request.NamespacedName, service) + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "failed to get service %s", request.String()) + } + + copySvc := service.DeepCopy() + + if shouldDeleteAllPoolServices(copySvc) { + if err := r.deleteAllPoolServices(copySvc); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "failed to clean pool services") + } + return reconcile.Result{}, nil + } + + if err := r.reconcilePoolServices(copySvc); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "failed to reconcile service") + } + + if err := r.syncService(copySvc); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "failed to sync service %s/%s", copySvc.Namespace, copySvc.Name) + } + + return reconcile.Result{}, nil +} + +func shouldDeleteAllPoolServices(svc *corev1.Service) bool { + if !svc.DeletionTimestamp.IsZero() { + return true + } + + if !isLoadBalancerSetService(svc) { + return true + } + + return false +} + +func (r *ReconcileLoadBalancerSet) deleteAllPoolServices(svc *corev1.Service) error { + currentPoolServices, err := r.currentPoolServices(svc) + if err != nil { + return errors.Wrapf(err, "failed to get current pool services for service %s/%s", svc.Namespace, svc.Name) + } + + if canRemoveFinalizer(currentPoolServices) { + if err := r.removeFinalizer(svc); err != nil { + return errors.Wrapf(err, "failed to remove finalizer") + } + return nil + } + + if err := r.deletePoolServices(currentPoolServices); err != nil { + return errors.Wrapf(err, "failed to delete all pool services for service %s/%s", svc.Namespace, svc.Name) + } + + return nil +} + +func (r *ReconcileLoadBalancerSet) currentPoolServices(svc *corev1.Service) ([]netv1alpha1.PoolService, error) { + listSelector := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + network.LabelServiceName: svc.Name, + labelManageBy: names.LoadBalancerSetController}), + Namespace: svc.Namespace, + } + + poolServiceList := &netv1alpha1.PoolServiceList{} + if err := r.List(context.TODO(), poolServiceList, listSelector); err != nil { + return nil, errors.Wrapf(err, "failed to list pool service with %s/%s", svc.Namespace, svc.Name) + } + + return filterInvalidPoolService(poolServiceList.Items), nil +} + +func filterInvalidPoolService(poolServices []netv1alpha1.PoolService) []netv1alpha1.PoolService { + var filteredPoolServices []netv1alpha1.PoolService + for _, item := range poolServices { + if !isValidPoolService(&item) { + klog.Warningf("Pool Service %s/%s is not valid, label %s or %s is modified", + item.Namespace, item.Name, network.LabelServiceName, network.LabelNodePoolName) + continue + } + filteredPoolServices = append(filteredPoolServices, item) + } + return filteredPoolServices +} + +func isValidPoolService(poolService *netv1alpha1.PoolService) bool { + if poolService.Labels == nil { + return false + } + if poolService.Labels[network.LabelServiceName]+"-"+poolService.Labels[network.LabelNodePoolName] != poolService.Name { + return false + } + return true +} + +func canRemoveFinalizer(poolServices []netv1alpha1.PoolService) bool { + return len(poolServices) == 0 +} + +func (r *ReconcileLoadBalancerSet) deletePoolServices(poolServices []netv1alpha1.PoolService) error { + for _, ps := range poolServices { + if !ps.DeletionTimestamp.IsZero() { + continue + } + + if err := r.Delete(context.Background(), &ps); err != nil { + return errors.Wrapf(err, "failed to delete poolservice %s/%s", ps.Namespace, ps.Name) + } + } + return nil +} + +func (r *ReconcileLoadBalancerSet) reconcilePoolServices(svc *corev1.Service) error { + if err := r.addFinalizer(svc); err != nil { + return errors.Wrapf(err, "failed to add finalizer") + } + + if err := r.syncPoolServices(svc); err != nil { + return errors.Wrapf(err, "failed to sync pool services") + } + return nil +} + +func (r *ReconcileLoadBalancerSet) syncPoolServices(svc *corev1.Service) error { + currentPoolServices, err := r.currentPoolServices(svc) + if err != nil { + return errors.Wrapf(err, "failed to get current pool services for service %s/%s", svc.Namespace, svc.Name) + } + + desiredPoolServices, err := r.desiredPoolServices(svc) + if err != nil { + return errors.Wrapf(err, "failed to calculate desire pool services for service %s/%s", svc.Namespace, svc.Name) + } + poolServicesToApply, poolServicesToDelete := r.diffPoolServices(desiredPoolServices, currentPoolServices) + + if err := r.deletePoolServices(poolServicesToDelete); err != nil { + return errors.Wrapf(err, "failed to delete pool services %v", poolServicesToDelete) + } + + if err := r.applyPoolServices(poolServicesToApply); err != nil { + return errors.Wrapf(err, "failed to apply pool services %v", poolServicesToApply) + } + + return nil +} + +func (r *ReconcileLoadBalancerSet) desiredPoolServices(svc *corev1.Service) ([]netv1alpha1.PoolService, error) { + if !isLoadBalancerSetService(svc) { + klog.Warningf("service %s/%s is not multi regional service, set desire pool services is nil", svc.Namespace, svc.Name) + return nil, nil + } + + nps, err := r.listNodePoolsByLabelSelector(svc) + if err != nil { + return nil, errors.Wrapf(err, "failed to list nodepool with service") + } + + var pss []netv1alpha1.PoolService + for _, np := range nps { + pss = append(pss, buildPoolService(svc, &np)) + } + return pss, nil +} + +func (r *ReconcileLoadBalancerSet) listNodePoolsByLabelSelector(svc *corev1.Service) ([]v1beta1.NodePool, error) { + labelStr := svc.Annotations[network.AnnotationNodePoolSelector] + labelSelector, err := labels.Parse(labelStr) + if err != nil { + return nil, err + } + + npList := &v1beta1.NodePoolList{} + if err := r.List(context.Background(), npList, &client.ListOptions{LabelSelector: labelSelector}); err != nil { + return nil, err + } + + return filterDeletionNodePools(npList.Items), nil +} + +func filterDeletionNodePools(allItems []v1beta1.NodePool) []v1beta1.NodePool { + var filterItems []v1beta1.NodePool + + for _, item := range allItems { + if !item.DeletionTimestamp.IsZero() { + continue + } + filterItems = append(filterItems, item) + } + + return filterItems +} + +func buildPoolService(svc *corev1.Service, np *v1beta1.NodePool) netv1alpha1.PoolService { + isController, isBlockOwnerDeletion := true, true + return netv1alpha1.PoolService{ + TypeMeta: v1.TypeMeta{ + Kind: "PoolService", + APIVersion: netv1alpha1.GroupVersion.String(), + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: svc.Namespace, + Name: svc.Name + "-" + np.Name, + Labels: map[string]string{network.LabelServiceName: svc.Name, network.LabelNodePoolName: np.Name, labelManageBy: names.LoadBalancerSetController}, + OwnerReferences: []v1.OwnerReference{ + { + APIVersion: svc.APIVersion, + Name: svc.Name, + Kind: svc.Kind, + UID: svc.UID, + Controller: &isController, + BlockOwnerDeletion: &isBlockOwnerDeletion, + }, + { + APIVersion: np.APIVersion, + Name: np.Name, + Kind: np.Kind, + UID: np.UID, + BlockOwnerDeletion: &isBlockOwnerDeletion, + }, + }, + }, + Spec: netv1alpha1.PoolServiceSpec{ + LoadBalancerClass: svc.Spec.LoadBalancerClass, + }, + } +} + +func (r *ReconcileLoadBalancerSet) diffPoolServices(desirePoolServices, currentPoolServices []netv1alpha1.PoolService) (applications []netv1alpha1.PoolService, deletions []netv1alpha1.PoolService) { + for _, dps := range desirePoolServices { + if exist := r.isPoolServicePresent(currentPoolServices, dps); !exist { + applications = append(applications, dps) + } + } + + for _, cps := range currentPoolServices { + if exist := r.isPoolServicePresent(desirePoolServices, cps); !exist { + deletions = append(deletions, cps) + } + } + + return +} + +func (r *ReconcileLoadBalancerSet) isPoolServicePresent(poolServices []netv1alpha1.PoolService, ps netv1alpha1.PoolService) bool { + for _, dps := range poolServices { + if dps.Name == ps.Name { + return true + } + } + return false +} + +func (r *ReconcileLoadBalancerSet) applyPoolServices(poolServices []netv1alpha1.PoolService) error { + for _, ps := range poolServices { + if err := r.applyPoolService(&ps); err != nil { + return errors.Wrapf(err, "failed to apply pool service %s/%s", ps.Namespace, ps.Name) + } + } + return nil +} + +func (r *ReconcileLoadBalancerSet) applyPoolService(poolService *netv1alpha1.PoolService) error { + currentPoolService, exist, err := r.tryGetPoolService(poolService.Namespace, poolService.Name) + if err != nil { + return errors.Wrapf(err, "failed to try get pool service %s/%s", poolService.Namespace, poolService.Name) + } + + if exist { + if err := r.compareAndUpdatePoolService(currentPoolService, poolService); err != nil { + return errors.Wrapf(err, "failed to compare and update pool service %s/%s", poolService.Namespace, poolService.Name) + } + return nil + } + + return r.Create(context.Background(), poolService, &client.CreateOptions{}) +} + +func (r *ReconcileLoadBalancerSet) tryGetPoolService(namespace, name string) (*netv1alpha1.PoolService, bool, error) { + currentPs := &netv1alpha1.PoolService{} + err := r.Get(context.Background(), types.NamespacedName{ + Namespace: namespace, + Name: name, + }, currentPs) + + if apierrors.IsNotFound(err) { + return nil, false, nil + } + return currentPs, true, err +} + +func (r *ReconcileLoadBalancerSet) compareAndUpdatePoolService(currentPoolService, desirePoolService *netv1alpha1.PoolService) error { + if currentPoolService.Labels[labelManageBy] != names.LoadBalancerSetController { + r.recorder.Eventf(currentPoolService, corev1.EventTypeWarning, "ManagedConflict", poolServiceManagedConflictEventMsgFormat, + currentPoolService.Namespace, currentPoolService.Name, currentPoolService.Namespace, desirePoolService.Labels[network.LabelServiceName]) + return nil + } + + isLabelUpdated := compareAndUpdatePoolServiceLabel(currentPoolService, desirePoolService.Labels) + isOwnerUpdated := compareAndUpdatePoolServiceOwners(currentPoolService, desirePoolService.OwnerReferences) + + if !isLabelUpdated && !isOwnerUpdated { + return nil + } + + r.recorder.Eventf(currentPoolService, corev1.EventTypeWarning, "Modified", poolServiceModifiedEventMsgFormat, currentPoolService.Namespace, currentPoolService.Name) + if err := r.Update(context.Background(), currentPoolService); err != nil { + return errors.Wrapf(err, "failed to update pool service") + } + + return nil +} + +func compareAndUpdatePoolServiceLabel(currentPoolService *netv1alpha1.PoolService, desireLabels map[string]string) bool { + isUpdate := false + if currentPoolService.Labels[network.LabelServiceName] != desireLabels[network.LabelServiceName] { + currentPoolService.Labels[network.LabelServiceName] = desireLabels[network.LabelServiceName] + isUpdate = true + } + + if currentPoolService.Labels[network.LabelNodePoolName] != desireLabels[network.LabelNodePoolName] { + currentPoolService.Labels[network.LabelNodePoolName] = desireLabels[network.LabelNodePoolName] + isUpdate = true + } + + return isUpdate +} + +func compareAndUpdatePoolServiceOwners(currentPoolService *netv1alpha1.PoolService, desireOwners []v1.OwnerReference) bool { + if !reflect.DeepEqual(currentPoolService.OwnerReferences, desireOwners) { + currentPoolService.OwnerReferences = desireOwners + return true + } + return false +} + +func (r *ReconcileLoadBalancerSet) syncService(svc *corev1.Service) error { + poolServices, err := r.currentPoolServices(svc) + if err != nil { + return errors.Wrapf(err, "failed to get current pool services for service %s/%s", svc.Namespace, svc.Name) + } + + aggregatedAnnotations := aggregatePoolServicesAnnotations(poolServices) + aggregatedLbStatus := aggregateLbStatus(poolServices) + + return r.compareAndUpdateService(svc, aggregatedAnnotations, aggregatedLbStatus) +} + +func (r *ReconcileLoadBalancerSet) compareAndUpdateService(svc *corev1.Service, annotations map[string]string, lbStatus corev1.LoadBalancerStatus) error { + isUpdatedAnnotations := compareAndUpdateServiceAnnotations(svc, annotations) + isUpdatedLbStatus := compareAndUpdateServiceLbStatus(svc, lbStatus) + + if !isUpdatedLbStatus && !isUpdatedAnnotations { + return nil + } + + return r.Update(context.Background(), svc) +} diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller_test.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller_test.go new file mode 100644 index 00000000000..1fd243d0cb2 --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/loadbalancerset_controller_test.go @@ -0,0 +1,980 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "context" + "fmt" + "reflect" + "sort" + "strings" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/openyurtio/openyurt/cmd/yurt-manager/names" + "github.com/openyurtio/openyurt/pkg/apis" + "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1" + "github.com/openyurtio/openyurt/pkg/apis/network" + "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" +) + +const ( + mockServiceName = "test" + mockNodePoolLabel = "app=deploy" + mockServiceUid = "c0af506a-7096-4ef9-b39a-eac2feb5c07g" + mockNodePoolUid = "f47dd9db-d3bc-40f3-8d03-7409930b6289" +) + +var ( + elbClass = "elb" +) + +func TestReconcilePoolService_Reconcile(t *testing.T) { + scheme := initScheme(t) + + t.Run("test get service not found", func(t *testing.T) { + rc := ReconcileLoadBalancerSet{ + Client: fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects().Build(), + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + assertErrNil(t, err) + }) + + t.Run("test create pool services", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234,app=deploy") + np3 := newNodepool("np345", "name=np345") + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np3).WithObjects(np2).Build() + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + psl := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psl) + + assertErrNil(t, err) + assertPoolServicesNameList(t, psl, []string{"test-np123", "test-np234"}) + assertPoolServicesLBClass(t, psl, svc.Spec.LoadBalancerClass) + assertPoolServiceLabels(t, psl, svc.Name) + }) + + t.Run("test nodepool selector is nil", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + svc.Annotations[network.AnnotationNodePoolSelector] = "" + + np1 := newNodepool("np123", "name=np123,app=deploy") + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).Build() + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + psl := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psl) + assertPoolServiceCounts(t, 0, len(psl.Items)) + }) + + t.Run("test create multi pool service in kube-system namespace", func(t *testing.T) { + svc := newService(v1.NamespaceSystem, mockServiceName) + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234,app=deploy") + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np2).Build() + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceSystem, mockServiceName)) + + psl := &v1alpha1.PoolServiceList{} + err := c.List(context.Background(), psl) + + assertErrNil(t, err) + assertPoolServicesNameList(t, psl, []string{"test-np123", "test-np234"}) + assertPoolServicesLBClass(t, psl, svc.Spec.LoadBalancerClass) + }) + + t.Run("test delete pool services", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234") + + ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps2 := newPoolService(v1.NamespaceDefault, "np234", nil, nil) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np2).WithObjects(ps1).WithObjects(ps2).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + psl := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psl) + + assertErrNil(t, err) + assertPoolServicesNameList(t, psl, []string{"test-np123"}) + }) + + t.Run("test aggregate ingress status multi pool services", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234,app=deploy") + + ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, []corev1.LoadBalancerIngress{{IP: "2.2.3.4"}}) + ps2 := newPoolService(v1.NamespaceDefault, "np234", nil, []corev1.LoadBalancerIngress{{IP: "1.2.3.4"}}) + ps3 := newPoolService(v1.NamespaceSystem, "np234", nil, []corev1.LoadBalancerIngress{{IP: "3.4.5.6"}}) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np2).WithObjects(ps1).WithObjects(ps2).WithObjects(ps3).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + assertErrNil(t, err) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + expectedStatus := []corev1.LoadBalancerIngress{{IP: "1.2.3.4"}, {IP: "2.2.3.4"}} + + assertLoadBalancerStatusEqual(t, expectedStatus, newSvc.Status.LoadBalancer.Ingress) + }) + + t.Run("test aggregate annotations multi pool services", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234,app=deploy") + + ps1 := newPoolService(v1.NamespaceDefault, "np123", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb34567"}, nil) + ps2 := newPoolService(v1.NamespaceDefault, "np234", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb23456"}, nil) + ps3 := newPoolService(v1.NamespaceDefault, "np345", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb12345"}, nil) + ps4 := newPoolService(v1.NamespaceDefault, "np456", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb12345"}, nil) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np2). + WithObjects(ps1).WithObjects(ps2).WithObjects(ps3).WithObjects(ps4).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + assertErrNil(t, err) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + expectedAnnotations := map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb23456,lb34567"} + + assertOpenYurtAnnotationsEqual(t, expectedAnnotations, newSvc.Annotations) + assertResourceVersion(t, "1001", newSvc.ResourceVersion) + }) + + t.Run("test aggregate service not update", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234,app=deploy") + + ps1 := newPoolService(v1.NamespaceDefault, "np123", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb34567"}, nil) + ps2 := newPoolService(v1.NamespaceDefault, "np234", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb23456"}, nil) + svc.Annotations[network.AggregateAnnotationsKeyPrefix+"/lb-id"] = "lb23456,lb34567" + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np2). + WithObjects(ps1).WithObjects(ps2).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + assertErrNil(t, err) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + expectedAnnotations := map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb23456,lb34567"} + + assertOpenYurtAnnotationsEqual(t, expectedAnnotations, newSvc.Annotations) + assertResourceVersion(t, "1000", newSvc.ResourceVersion) + }) + + t.Run("test aggregate service annotations delete value", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234,app=deploy") + + ps1 := newPoolService(v1.NamespaceDefault, "np123", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb34567"}, nil) + svc.Annotations[network.AggregateAnnotationsKeyPrefix+"/lb-id"] = "lb23456,lb34567" + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np2). + WithObjects(ps1).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + assertErrNil(t, err) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + expectedAnnotations := map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb34567"} + + assertOpenYurtAnnotationsEqual(t, expectedAnnotations, newSvc.Annotations) + assertResourceVersion(t, "1001", newSvc.ResourceVersion) + }) + + t.Run("test aggregate annotations delete key", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + np1 := newNodepool("np123", "name=np123,app=deploy") + np2 := newNodepool("np234", "name=np234,app=deploy") + + svc.Annotations[network.AggregateAnnotationsKeyPrefix+"/lb-id"] = "lb23456,lb34567" + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(np2).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + assertErrNil(t, err) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + expectedAnnotations := map[string]string{} + + assertOpenYurtAnnotationsEqual(t, expectedAnnotations, newSvc.Annotations) + assertResourceVersion(t, "1001", newSvc.ResourceVersion) + }) + + t.Run("add service finalizer", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + assertErrNil(t, err) + + assertResourceVersion(t, "1000", newSvc.ResourceVersion) + assertFinalizerExist(t, newSvc) + }) + + t.Run("don't need to add service finalizer", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + controllerutil.AddFinalizer(svc, poolServiceFinalizer) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + assertErrNil(t, err) + + assertResourceVersion(t, "999", newSvc.ResourceVersion) + assertFinalizerExist(t, newSvc) + }) + + t.Run("delete service finalizer", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + controllerutil.AddFinalizer(svc, poolServiceFinalizer) + svc.DeletionTimestamp = &v1.Time{Time: time.Now()} + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + newSvc := &corev1.Service{} + err := c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + assertNotFountError(t, err) + }) + + t.Run("don't delete/update service with delete time not zero", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + controllerutil.AddFinalizer(svc, poolServiceFinalizer) + svc.DeletionTimestamp = &v1.Time{Time: time.Now()} + + ps1 := newPoolService(v1.NamespaceDefault, "np123", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb34567"}, nil) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(ps1).Build() + + rc := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + assertErrNil(t, err) + + newSvc := &corev1.Service{} + err = c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + assertErrNil(t, err) + assertFinalizerExist(t, newSvc) + + }) + + t.Run("clean pool services", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + controllerutil.AddFinalizer(svc, poolServiceFinalizer) + svc.DeletionTimestamp = &v1.Time{Time: time.Now()} + + ps1 := newPoolService(v1.NamespaceDefault, "np123", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb34567"}, nil) + ps2 := newPoolService(v1.NamespaceDefault, "np234", map[string]string{network.AggregateAnnotationsKeyPrefix + "/lb-id": "lb45678"}, nil) + controllerutil.AddFinalizer(ps1, "elb.openyurt.io/resources") + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(ps1).WithObjects(ps2).Build() + + r := ReconcileLoadBalancerSet{ + Client: c, + } + + _, err := r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + assertErrNil(t, err) + + newPs1 := &v1alpha1.PoolService{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, Name: "test-np123"}, newPs1) + assertPoolServiceIsDeleting(t, newPs1) + assertResourceVersion(t, "1000", newPs1.ResourceVersion) + + newPs2 := &v1alpha1.PoolService{} + err = c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, Name: "test-np234"}, newPs2) + assertNotFountError(t, err) + }) + + t.Run("check pool service owner references", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + np := newNodepool("np123", "name=np123,app=deploy") + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np).Build() + + rc := &ReconcileLoadBalancerSet{ + Client: c, + } + + rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + ps := &v1alpha1.PoolService{} + err := c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName + "-np123", + }, ps) + + assertErrNil(t, err) + + controller, blockOwnerDeletion := true, true + expectedOwnerReferences := []v1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Service", + Name: mockServiceName, + UID: mockServiceUid, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &controller, + }, + { + APIVersion: "apps.openyurt.io/v1beta1", + Kind: "NodePool", + Name: "np123", + UID: mockNodePoolUid, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + + assertOwnerReferences(t, expectedOwnerReferences, ps.OwnerReferences) + }) + + t.Run("nodepool is deletion", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + np := newNodepool("np123", "name=np123,app=deploy") + np.DeletionTimestamp = &v1.Time{Time: time.Now()} + + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np).WithObjects(ps).Build() + + r := ReconcileLoadBalancerSet{ + Client: c, + } + + r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + psList := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psList) + + assertPoolServiceCounts(t, 0, len(psList.Items)) + }) + + t.Run("modify service type to not LB", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + svc.Spec.Type = corev1.ServiceTypeClusterIP + np := newNodepool("np123", "name=np123,app=deploy") + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np).WithObjects(ps).Build() + + r := ReconcileLoadBalancerSet{ + Client: c, + } + r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + psList := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psList) + + newSvc := &corev1.Service{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: mockServiceName, + }, newSvc) + + assertPoolServiceCounts(t, 0, len(psList.Items)) + assertFinalizerNotExist(t, newSvc) + }) + + t.Run("have not pool selector annotation", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + delete(svc.Annotations, network.AnnotationNodePoolSelector) + + np := newNodepool("np123", "name=np123,app=deploy") + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np).WithObjects(ps).Build() + + r := ReconcileLoadBalancerSet{ + Client: c, + } + r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + psList := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psList) + + assertPoolServiceCounts(t, 0, len(psList.Items)) + }) + + t.Run("not managed by controller", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + delete(ps.Labels, labelManageBy) + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(ps).Build() + + r := ReconcileLoadBalancerSet{ + Client: c, + } + r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + psList := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psList) + + assertPoolServicesNameList(t, psList, []string{mockServiceName + "-np123"}) + }) + + t.Run("Modifying the service name to reference a non-existent service", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps.Labels[network.LabelServiceName] = "mock" + ps.OwnerReferences = nil + ps.Labels[network.LabelNodePoolName] = "np111" + + np := newNodepool("np123", "name=np123,app=deploy") + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(ps).WithObjects(np).Build() + recorder := &record.FakeRecorder{ + Events: make(chan string, 1), + } + + r := ReconcileLoadBalancerSet{ + Client: c, + recorder: recorder, + } + _, err := r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + assertErrNil(t, err) + + newPs := &v1alpha1.PoolService{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, Name: "test-np123", + }, newPs) + + assertString(t, mockServiceName, newPs.Labels[network.LabelServiceName]) + assertString(t, "np123", newPs.Labels[network.LabelNodePoolName]) + + controller, blockOwnerDeletion := true, true + expectedOwnerReferences := []v1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Service", + Name: mockServiceName, + UID: mockServiceUid, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &controller, + }, + { + APIVersion: "apps.openyurt.io/v1beta1", + Kind: "NodePool", + Name: "np123", + UID: mockNodePoolUid, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + + assertOwnerReferences(t, expectedOwnerReferences, newPs.OwnerReferences) + + eve := <-recorder.Events + expected := fmt.Sprintf("%s %s %s%s", corev1.EventTypeWarning, "Modified", "PoolService default/test-np123 resource is manually modified, the controller will overwrite this modification", "") + assertString(t, expected, eve) + }) + + t.Run("Modifying the service name to reference a existent service", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc2 := newService(v1.NamespaceDefault, "mock") + + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps.Labels[network.LabelServiceName] = "mock" + ps.OwnerReferences = nil + ps.Labels[network.LabelNodePoolName] = "np111" + + np := newNodepool("np123", "name=np123,app=deploy") + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc1).WithObjects(svc2).WithObjects(ps).WithObjects(np).Build() + recorder := &record.FakeRecorder{ + Events: make(chan string, 1), + } + + r := ReconcileLoadBalancerSet{ + Client: c, + recorder: recorder, + } + _, err := r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, "mock")) + + assertErrNil(t, err) + + newPs := &v1alpha1.PoolService{} + err = c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, Name: "test-np123", + }, newPs) + assertErrNil(t, err) + }) + + t.Run("modifying the nodepool name", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + np := newNodepool("np123", "name=np123,app=deploy") + + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps.Labels[network.LabelNodePoolName] = "np111" + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(ps).WithObjects(np).Build() + recorder := record.NewFakeRecorder(1) + + r := &ReconcileLoadBalancerSet{ + Client: c, + recorder: recorder, + } + + _, err := r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + assertErrNil(t, err) + + newPs := &v1alpha1.PoolService{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: "test-np123", + }, newPs) + + assertString(t, newPs.Labels[network.LabelNodePoolName], "np123") + assertResourceVersion(t, "1000", newPs.ResourceVersion) + }) + + t.Run("modifying the pool service is not managed by controller", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps.Labels[network.LabelServiceName] = "mock" + delete(ps.Labels, labelManageBy) + + np := newNodepool("np123", "name=np123,app=deploy") + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(ps).WithObjects(np).Build() + recorder := &record.FakeRecorder{ + Events: make(chan string, 1), + } + + r := ReconcileLoadBalancerSet{ + Client: c, + recorder: recorder, + } + _, err := r.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockServiceName)) + + assertErrNil(t, err) + + newPs := &v1alpha1.PoolService{} + c.Get(context.Background(), types.NamespacedName{ + Namespace: v1.NamespaceDefault, Name: "test-np123", + }, newPs) + + assertString(t, newPs.Labels[network.LabelServiceName], "mock") + + eve := <-recorder.Events + expected := fmt.Sprintf("%s %s %s%s", corev1.EventTypeWarning, "ManagedConflict", "PoolService default/test-np123 is not managed by pool-service-controller, but the nodepool-labelselector of service default/test include it", "") + assertString(t, expected, eve) + }) +} + +func initScheme(t *testing.T) *runtime.Scheme { + scheme := runtime.NewScheme() + if err := clientgoscheme.AddToScheme(scheme); err != nil { + t.Fatal("Fail to add kubernetes clint-go custom resource") + } + apis.AddToScheme(scheme) + + return scheme +} + +func newReconcileRequest(namespace string, name string) reconcile.Request { + return reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: name, + }} +} + +func assertErrNil(t testing.TB, err error) { + t.Helper() + + if err != nil { + t.Errorf("expected err is nil, but got %v", err) + } +} + +func newService(namespace string, name string) *corev1.Service { + return &corev1.Service{ + TypeMeta: v1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Annotations: map[string]string{ + network.AnnotationNodePoolSelector: mockNodePoolLabel, + }, + Namespace: namespace, + Name: name, + UID: mockServiceUid, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + LoadBalancerClass: &elbClass, + }, + } +} + +func newNodepool(name string, labelStr string) *v1beta1.NodePool { + var splitLabels []string + if labelStr != "" { + splitLabels = strings.Split(labelStr, ",") + } + + labels := make(map[string]string) + for _, s := range splitLabels { + kv := strings.Split(s, "=") + labels[kv[0]] = kv[1] + } + + return &v1beta1.NodePool{ + TypeMeta: v1.TypeMeta{ + Kind: "NodePool", + APIVersion: "apps.openyurt.io/v1beta1", + }, + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + Name: name, + UID: mockNodePoolUid, + }, + } +} + +func assertPoolServicesNameList(t testing.TB, psl *v1alpha1.PoolServiceList, expectedNameList []string) { + t.Helper() + + sort.Strings(expectedNameList) + + gotNameList := getPoolServicesNameList(t, psl) + + if !reflect.DeepEqual(expectedNameList, gotNameList) { + t.Errorf("expected name list %v, but got name list %v", expectedNameList, gotNameList) + } +} + +func getPoolServicesNameList(t testing.TB, psl *v1alpha1.PoolServiceList) []string { + t.Helper() + + var gotPoolServiceNameList []string + for _, item := range psl.Items { + gotPoolServiceNameList = append(gotPoolServiceNameList, item.Name) + } + sort.Strings(gotPoolServiceNameList) + + return gotPoolServiceNameList +} + +func assertPoolServicesLBClass(t testing.TB, psl *v1alpha1.PoolServiceList, lbClass *string) { + t.Helper() + + for _, ps := range psl.Items { + if *ps.Spec.LoadBalancerClass != *lbClass { + t.Errorf("expected loadbalancer class %s, but got %s", *lbClass, *ps.Spec.LoadBalancerClass) + } + } +} + +func assertPoolServiceLabels(t testing.TB, psl *v1alpha1.PoolServiceList, serviceName string) { + t.Helper() + + for _, ps := range psl.Items { + if ps.Labels == nil { + t.Errorf("expected labels not nil") + return + } + if ps.Labels[labelManageBy] != names.LoadBalancerSetController { + t.Errorf("expected pool service managed by %s", names.LoadBalancerSetController) + } + if ps.Labels[network.LabelServiceName] != serviceName { + t.Errorf("expected pool service name is %s, but got %s", serviceName, ps.Labels[network.LabelServiceName]) + } + } +} + +func newPoolService(namespace string, poolName string, annotations map[string]string, lbIngress []corev1.LoadBalancerIngress) *v1alpha1.PoolService { + blockOwnerDeletion := true + controller := true + return &v1alpha1.PoolService{ + TypeMeta: v1.TypeMeta{ + Kind: "PoolService", + APIVersion: v1alpha1.GroupVersion.String(), + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: mockServiceName + "-" + poolName, + Labels: map[string]string{network.LabelServiceName: mockServiceName, network.LabelNodePoolName: poolName, labelManageBy: names.LoadBalancerSetController}, + Annotations: annotations, + OwnerReferences: []v1.OwnerReference{ + { + APIVersion: "v1", + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &controller, + Kind: "Service", + Name: mockServiceName, + UID: mockServiceUid, + }, { + APIVersion: "apps.openyurt.io/v1alpha1", + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &controller, + Kind: "NodePool", + Name: poolName, + UID: mockNodePoolUid, + }, + }, + }, + Spec: v1alpha1.PoolServiceSpec{ + LoadBalancerClass: &elbClass, + }, + Status: v1alpha1.PoolServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: lbIngress, + }, + }, + } +} + +func assertLoadBalancerStatusEqual(t testing.TB, expected, got []corev1.LoadBalancerIngress) { + t.Helper() + + if len(expected) != len(got) { + t.Errorf("expected %v, but got %v", expected, got) + return + } + + sort.Slice(expected, func(i, j int) bool { + return expected[i].IP < expected[j].IP + }) + + for i := 0; i < len(expected); i++ { + if !reflect.DeepEqual(expected[i], got[i]) { + t.Errorf("expected %++v, but got %++v", expected[i], got[i]) + } + } +} + +func assertOpenYurtAnnotationsEqual(t testing.TB, expected, got map[string]string) { + t.Helper() + + for k, v := range expected { + if !strings.HasPrefix(k, network.AggregateAnnotationsKeyPrefix) || (k == network.AnnotationNodePoolSelector) { + continue + } + if got[k] != v { + t.Errorf("expected key %s value is %s, but got %s", k, v, got[k]) + } + } + + for k, v := range got { + if !strings.HasPrefix(k, network.AggregateAnnotationsKeyPrefix) || (k == network.AnnotationNodePoolSelector) { + continue + } + + if expected[k] != v { + t.Errorf("expected key value %s is %s, but got %s", k, expected[k], v) + } + } +} + +func assertResourceVersion(t testing.TB, expected, got string) { + t.Helper() + + if expected != got { + t.Errorf("expected resource version is %s, but got %s", expected, got) + } +} + +func assertFinalizerExist(t testing.TB, svc *corev1.Service) { + t.Helper() + + if !controllerutil.ContainsFinalizer(svc, poolServiceFinalizer) { + t.Errorf("expected service has finalizer %s", poolServiceFinalizer) + } +} + +func assertFinalizerNotExist(t testing.TB, svc *corev1.Service) { + t.Helper() + + if controllerutil.ContainsFinalizer(svc, poolServiceFinalizer) { + t.Errorf("expected service finalizer %s is not exist, but it is exist", poolServiceFinalizer) + } +} + +func assertPoolServiceIsDeleting(t testing.TB, ps *v1alpha1.PoolService) { + t.Helper() + + if ps.DeletionTimestamp.IsZero() { + t.Errorf("expected pool service is deleting") + } +} + +func assertNotFountError(t testing.TB, err error) { + t.Helper() + + if !apierrors.IsNotFound(err) { + t.Errorf("exptected error is not found, but got %v", err) + } +} + +func assertOwnerReferences(t testing.TB, expected, got []v1.OwnerReference) { + t.Helper() + + if !reflect.DeepEqual(expected, got) { + t.Errorf("expected owner reference is %v, but got %v", expected, got) + } +} + +func assertPoolServiceCounts(t testing.TB, expected, got int) { + t.Helper() + + if expected != got { + t.Errorf("expected pool service counts is %d, but got %d", expected, got) + } +} + +func assertString(t testing.TB, expected, got string) { + t.Helper() + + if expected != got { + t.Errorf("expected %s, but got %s", expected, got) + } +} diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate.go new file mode 100644 index 00000000000..c329d17b151 --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate.go @@ -0,0 +1,255 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "reflect" + + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/openyurtio/openyurt/cmd/yurt-manager/names" + "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1" + "github.com/openyurtio/openyurt/pkg/apis/network" + "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" +) + +func NewServicePredicated() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + svc, ok := createEvent.Object.(*v1.Service) + if !ok { + return false + } + + return isLoadBalancerSetService(svc) + }, + UpdateFunc: func(event event.UpdateEvent) bool { + oldSvc, ok := event.ObjectOld.(*v1.Service) + if !ok { + return false + } + + newSvc, ok := event.ObjectNew.(*v1.Service) + if !ok { + return false + } + + if !isLoadBalancerSetService(oldSvc) && !isLoadBalancerSetService(newSvc) { + return false + } + + return isServiceChange(oldSvc, newSvc) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + svc, ok := deleteEvent.Object.(*v1.Service) + if !ok { + return false + } + + return isLoadBalancerSetService(svc) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + svc, ok := genericEvent.Object.(*v1.Service) + if !ok { + return false + } + + return isLoadBalancerSetService(svc) + }, + } +} + +func isLoadBalancerSetService(svc *v1.Service) bool { + if svc.Spec.Type != v1.ServiceTypeLoadBalancer { + return false + } + + if svc.Spec.LoadBalancerClass == nil { + return false + } + + if svc.Annotations == nil { + return false + } + + if len(svc.Annotations[network.AnnotationNodePoolSelector]) == 0 { + return false + } + + return true +} + +func isServiceChange(oldSvc, newSvc *v1.Service) bool { + if isDeleteTimeChange(oldSvc, newSvc) { + return true + } + + if isNodePoolSelectorChange(oldSvc.Annotations, newSvc.Annotations) { + return true + } + + if isServiceTypeChange(oldSvc, newSvc) { + return true + } + + if isFinalizersChange(oldSvc, newSvc) { + return true + } + + if isAggregateAnnotationsChange(oldSvc, newSvc) { + return true + } + + return false +} + +func isDeleteTimeChange(oldSvc, newSvc *v1.Service) bool { + return oldSvc.DeletionTimestamp != newSvc.DeletionTimestamp +} + +func isNodePoolSelectorChange(oldAnnotations, newAnnotations map[string]string) bool { + return !annotationValueIsEqual(oldAnnotations, newAnnotations, network.AnnotationNodePoolSelector) +} + +func isServiceTypeChange(oldSvc, newSvc *v1.Service) bool { + return oldSvc.Spec.Type != newSvc.Spec.Type +} + +func isFinalizersChange(oldSvc, newSvc *v1.Service) bool { + return !reflect.DeepEqual(oldSvc.Finalizers, newSvc.Finalizers) +} + +func isAggregateAnnotationsChange(oldSvc, newSvc *v1.Service) bool { + oldAggregateAnnotations := filterIgnoredKeys(oldSvc.Annotations) + newAggregateAnnotations := filterIgnoredKeys(newSvc.Annotations) + + return !reflect.DeepEqual(oldAggregateAnnotations, newAggregateAnnotations) +} + +func NewPoolServicePredicated() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + ps, ok := createEvent.Object.(*v1alpha1.PoolService) + if !ok { + return false + } + return predicatedPoolService(ps) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + oldPs, ok := updateEvent.ObjectOld.(*v1alpha1.PoolService) + if !ok { + return false + } + + newPS, ok := updateEvent.ObjectNew.(*v1alpha1.PoolService) + if !ok { + return false + } + return predicatedPoolService(oldPs) || + predicatedPoolService(newPS) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + ps, ok := deleteEvent.Object.(*v1alpha1.PoolService) + if !ok { + return false + } + return predicatedPoolService(ps) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + ps, ok := genericEvent.Object.(*v1alpha1.PoolService) + if !ok { + return false + } + return predicatedPoolService(ps) + }, + } +} + +func predicatedPoolService(ps *v1alpha1.PoolService) bool { + if ps.Labels == nil { + return false + } + + if !hasServiceName(ps) { + return false + } + + return managedByController(ps) +} + +func hasServiceName(ps *v1alpha1.PoolService) bool { + if _, ok := ps.Labels[network.LabelServiceName]; !ok { + return false + } + return true +} + +func managedByController(ps *v1alpha1.PoolService) bool { + return ps.Labels[labelManageBy] == names.LoadBalancerSetController +} + +func NewNodePoolPredicated() predicate.Predicate { + return predicate.Funcs{ + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + oldNp, ok := updateEvent.ObjectOld.(*v1beta1.NodePool) + if !ok { + return false + } + + newNp, ok := updateEvent.ObjectNew.(*v1beta1.NodePool) + if !ok { + return false + } + + return isNodePoolChange(oldNp, newNp) + }, + CreateFunc: func(createEvent event.CreateEvent) bool { + np, ok := createEvent.Object.(*v1beta1.NodePool) + if !ok { + return false + } + return nodePoolHasLabels(np) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + np, ok := deleteEvent.Object.(*v1beta1.NodePool) + if !ok { + return false + } + return nodePoolHasLabels(np) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + np, ok := genericEvent.Object.(*v1beta1.NodePool) + if !ok { + return false + } + return nodePoolHasLabels(np) + }, + } +} + +func isNodePoolChange(oldNp, newNp *v1beta1.NodePool) bool { + if !reflect.DeepEqual(oldNp.Labels, newNp.Labels) { + return true + } + return false +} + +func nodePoolHasLabels(np *v1beta1.NodePool) bool { + return len(np.Labels) != 0 +} diff --git a/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate_test.go b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate_test.go new file mode 100644 index 00000000000..b293f4cbe4e --- /dev/null +++ b/pkg/yurtmanager/controller/loadbalancerset/loadbalancerset/predicate_test.go @@ -0,0 +1,263 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancerset + +import ( + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/event" + + "github.com/openyurtio/openyurt/pkg/apis/network" +) + +const ( + mockAnnotationLbId = network.AggregateAnnotationsKeyPrefix + "/lb-id" +) + +func TestServicePredicate(t *testing.T) { + f := NewServicePredicated() + t.Run("create/delete/generic service enqueue", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + + assertBool(t, true, f.Create(event.CreateEvent{Object: svc})) + assertBool(t, true, f.Delete(event.DeleteEvent{Object: svc})) + assertBool(t, true, f.Generic(event.GenericEvent{Object: svc})) + + }) + + t.Run("create/delete not lb type service not enqueue", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + svc.Spec.Type = v1.ServiceTypeClusterIP + + assertBool(t, false, f.Create(event.CreateEvent{Object: svc})) + assertBool(t, false, f.Delete(event.DeleteEvent{Object: svc})) + assertBool(t, false, f.Generic(event.GenericEvent{Object: svc})) + }) + + t.Run("create/delete not service class annotations service not enqueue", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + svc.Spec.LoadBalancerClass = nil + + assertBool(t, false, f.Create(event.CreateEvent{Object: svc})) + assertBool(t, false, f.Delete(event.DeleteEvent{Object: svc})) + assertBool(t, false, f.Generic(event.GenericEvent{Object: svc})) + }) + + t.Run("create not nodepool selector annotations not enqueue", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + delete(svc.Annotations, network.AnnotationNodePoolSelector) + + assertBool(t, false, f.Create(event.CreateEvent{Object: svc})) + assertBool(t, false, f.Delete(event.DeleteEvent{Object: svc})) + assertBool(t, false, f.Generic(event.GenericEvent{Object: svc})) + }) + + t.Run("update service not enqueue", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.Spec.ClusterIP = "1.2.3.4" + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, false, got) + }) + + t.Run("delete selector annotations", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc2 := newService(v1.NamespaceDefault, mockServiceName) + delete(svc2.Annotations, network.AnnotationNodePoolSelector) + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, true, got) + }) + + t.Run("modify selector annotations", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.Annotations[network.AnnotationNodePoolSelector] = "app=online" + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, true, got) + }) + + t.Run("add selector annotations", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + delete(svc1.Annotations, network.AnnotationNodePoolSelector) + svc2 := newService(v1.NamespaceDefault, mockServiceName) + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, true, got) + }) + + t.Run("modify service type", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.Spec.Type = v1.ServiceTypeClusterIP + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, true, got) + }) + + t.Run("modify finalizer", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc1.Finalizers = []string{poolServiceFinalizer} + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.Finalizers = []string{"test-finalizer"} + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, true, got) + }) + + t.Run("modify aggregate annotations", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc1.Annotations[mockAnnotationLbId] = "lb123" + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.Annotations[mockAnnotationLbId] = "lb123,lb234" + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, true, got) + }) + + t.Run("modify not aggregate annotations", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc1.Annotations["key1"] = "value1" + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.Annotations["key2"] = "value2" + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, false, got) + }) + + t.Run("modify finalizer with not multi lb services", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + delete(svc1.Annotations, network.AnnotationNodePoolSelector) + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.Finalizers = []string{"test-finalizer"} + delete(svc2.Annotations, network.AnnotationNodePoolSelector) + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, false, got) + }) + + t.Run("update delete time", func(t *testing.T) { + svc1 := newService(v1.NamespaceDefault, mockServiceName) + svc2 := newService(v1.NamespaceDefault, mockServiceName) + svc2.DeletionTimestamp = &metav1.Time{Time: time.Now()} + + got := f.Update(event.UpdateEvent{ObjectOld: svc1, ObjectNew: svc2}) + assertBool(t, true, got) + }) +} + +func assertBool(t testing.TB, expected, got bool) { + t.Helper() + + if expected != got { + t.Errorf("expected %v, but got %v", expected, got) + } +} + +func TestPoolServicePredicated(t *testing.T) { + f := NewPoolServicePredicated() + + t.Run("create/delete/update/generic pool service not managed and service name", func(t *testing.T) { + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + delete(ps.Labels, labelManageBy) + delete(ps.Labels, network.LabelServiceName) + + assertBool(t, false, f.Create(event.CreateEvent{Object: ps})) + assertBool(t, false, f.Update(event.UpdateEvent{ObjectOld: ps, ObjectNew: ps})) + assertBool(t, false, f.Delete(event.DeleteEvent{Object: ps})) + assertBool(t, false, f.Generic(event.GenericEvent{Object: ps})) + }) + + t.Run("create/delete/update/generic pool service not managed", func(t *testing.T) { + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + delete(ps.Labels, labelManageBy) + + assertBool(t, false, f.Create(event.CreateEvent{Object: ps})) + assertBool(t, false, f.Update(event.UpdateEvent{ObjectOld: ps, ObjectNew: ps})) + assertBool(t, false, f.Delete(event.DeleteEvent{Object: ps})) + assertBool(t, false, f.Generic(event.GenericEvent{Object: ps})) + }) + + t.Run("create/delete/update/generic pool service not service name", func(t *testing.T) { + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + delete(ps.Labels, network.LabelServiceName) + + assertBool(t, false, f.Create(event.CreateEvent{Object: ps})) + assertBool(t, false, f.Update(event.UpdateEvent{ObjectOld: ps, ObjectNew: ps})) + assertBool(t, false, f.Delete(event.DeleteEvent{Object: ps})) + assertBool(t, false, f.Generic(event.GenericEvent{Object: ps})) + + }) + + t.Run("create/delete/update/generic pool service", func(t *testing.T) { + ps := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + + assertBool(t, true, f.Create(event.CreateEvent{Object: ps})) + assertBool(t, true, f.Update(event.UpdateEvent{ObjectOld: ps, ObjectNew: ps})) + assertBool(t, true, f.Delete(event.DeleteEvent{Object: ps})) + assertBool(t, true, f.Generic(event.GenericEvent{Object: ps})) + + }) + + t.Run("create/delete/update/generic pool service not service name", func(t *testing.T) { + ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps2 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + delete(ps2.Labels, network.LabelServiceName) + delete(ps2.Labels, labelManageBy) + + assertBool(t, true, f.Update(event.UpdateEvent{ObjectOld: ps1, ObjectNew: ps2})) + }) +} + +func TestNodePoolPredicated(t *testing.T) { + f := NewNodePoolPredicated() + + t.Run("create/delete/generic nodepool predicated", func(t *testing.T) { + np := newNodepool("np123", "name=np123") + assertBool(t, true, f.Create(event.CreateEvent{Object: np})) + assertBool(t, true, f.Delete(event.DeleteEvent{Object: np})) + assertBool(t, true, f.Generic(event.GenericEvent{Object: np})) + }) + + t.Run("create/delete/generic nodepool not predicated", func(t *testing.T) { + np := newNodepool("np123", "") + assertBool(t, false, f.Create(event.CreateEvent{Object: np})) + assertBool(t, false, f.Delete(event.DeleteEvent{Object: np})) + assertBool(t, false, f.Generic(event.GenericEvent{Object: np})) + }) + + t.Run("update nodepool label predicated", func(t *testing.T) { + np1 := newNodepool("np123", "name=np124") + np2 := newNodepool("np123", "name=np124") + np2.Labels["app"] = "deploy" + assertBool(t, true, f.Update(event.UpdateEvent{ObjectOld: np1, ObjectNew: np2})) + }) + + t.Run("update nodepool not predicated", func(t *testing.T) { + np1 := newNodepool("np123", "name=np124") + np2 := newNodepool("np123", "name=np124") + np2.Spec.Type = "Cloud" + + assertBool(t, false, f.Update(event.UpdateEvent{ObjectOld: np1, ObjectNew: np2})) + }) + +} diff --git a/pkg/yurtmanager/controller/poolservice/poolservice_controller.go b/pkg/yurtmanager/controller/poolservice/poolservice_controller.go deleted file mode 100644 index 16e8320f676..00000000000 --- a/pkg/yurtmanager/controller/poolservice/poolservice_controller.go +++ /dev/null @@ -1,136 +0,0 @@ -/* -Copyright 2024 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the License); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an AS IS BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package poolservice - -import ( - "context" - "flag" - "fmt" - - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - appconfig "github.com/openyurtio/openyurt/cmd/yurt-manager/app/config" - "github.com/openyurtio/openyurt/cmd/yurt-manager/names" - netv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" - "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/poolservice/config" -) - -func init() { - flag.IntVar(&concurrentReconciles, "poolservice-workers", concurrentReconciles, "Max concurrent workers for PoolService controller.") -} - -var ( - concurrentReconciles = 3 - controllerKind = netv1alpha1.SchemeGroupVersion.WithKind("PoolService") -) - -func Format(format string, args ...interface{}) string { - s := fmt.Sprintf(format, args...) - return fmt.Sprintf("%s: %s", names.PoolServiceController, s) -} - -// Add creates a new PoolService Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller -// and Start it when the Manager is Started. -func Add(ctx context.Context, c *appconfig.CompletedConfig, mgr manager.Manager) error { - klog.Infof(Format("poolservice-controller add controller %s", controllerKind.String())) - return add(mgr, newReconciler(c, mgr)) -} - -var _ reconcile.Reconciler = &ReconcilePoolService{} - -// ReconcilePoolService reconciles a PoolService object -type ReconcilePoolService struct { - client.Client - scheme *runtime.Scheme - recorder record.EventRecorder - Configration config.PoolServiceControllerConfiguration -} - -// newReconciler returns a new reconcile.Reconciler -func newReconciler(c *appconfig.CompletedConfig, mgr manager.Manager) reconcile.Reconciler { - return &ReconcilePoolService{ - Client: mgr.GetClient(), - scheme: mgr.GetScheme(), - recorder: mgr.GetEventRecorderFor(names.PoolServiceController), - Configration: c.ComponentConfig.PoolServiceController, - } -} - -// add adds a new Controller to mgr with r as the reconcile.Reconciler -func add(mgr manager.Manager, r reconcile.Reconciler) error { - // Create a new controller - c, err := controller.New(names.PoolServiceController, mgr, controller.Options{ - Reconciler: r, MaxConcurrentReconciles: concurrentReconciles, - }) - if err != nil { - return err - } - - // Watch for changes to PoolService - err = c.Watch(&source.Kind{Type: &netv1alpha1.PoolService{}}, &handler.EnqueueRequestForObject{}) - if err != nil { - return err - } - - return nil -} - -// +kubebuilder:rbac:groups=net.openyurt.io,resources=poolservices,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=net.openyurt.io,resources=poolservices/status,verbs=get;update;patch - -// Reconcile reads that state of the cluster for a PoolService object and makes changes based on the state read -// and what is in the PoolService.Spec -func (r *ReconcilePoolService) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) { - - // Note !!!!!!!!!! - // We strongly recommend use Format() to encapsulation because Format() can print logs by module - // @kadisi - klog.Infof(Format("Reconcile PoolService %s/%s", request.Namespace, request.Name)) - - // Fetch the PoolService instance - instance := &netv1alpha1.PoolService{} - err := r.Get(context.TODO(), request.NamespacedName, instance) - if err != nil { - if errors.IsNotFound(err) { - return reconcile.Result{}, nil - } - return reconcile.Result{}, err - } - - if instance.DeletionTimestamp != nil { - return reconcile.Result{}, nil - } - - // Update Status - - // Update Instance - //if err = r.Update(context.TODO(), instance); err != nil { - // klog.Errorf(Format("Update PoolService %s error %v", klog.KObj(instance), err)) - // return reconcile.Result{Requeue: true}, err - //} - - return reconcile.Result{}, nil -} diff --git a/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_default.go b/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_default.go deleted file mode 100644 index 94ca8876525..00000000000 --- a/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_default.go +++ /dev/null @@ -1,39 +0,0 @@ -/* -Copyright 2024 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the License); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an AS IS BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - "context" - "fmt" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - - "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" -) - -// Default satisfies the defaulting webhook interface. -func (webhook *PoolServiceHandler) Default(ctx context.Context, obj runtime.Object) error { - ps, ok := obj.(*v1alpha1.PoolService) - if !ok { - return apierrors.NewBadRequest(fmt.Sprintf("expected a PoolService but got a %T", obj)) - } - - v1alpha1.SetDefaultsPoolService(ps) - - return nil -} diff --git a/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_handler.go b/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_handler.go deleted file mode 100644 index eccfba0f105..00000000000 --- a/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_handler.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2024 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the License); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an AS IS BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - "sigs.k8s.io/controller-runtime/pkg/webhook" - - "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" - "github.com/openyurtio/openyurt/pkg/yurtmanager/webhook/util" -) - -// SetupWebhookWithManager sets up Cluster webhooks. mutate path, validatepath, error -func (webhook *PoolServiceHandler) SetupWebhookWithManager(mgr ctrl.Manager) (string, string, error) { - // init - webhook.Client = mgr.GetClient() - - gvk, err := apiutil.GVKForObject(&v1alpha1.PoolService{}, mgr.GetScheme()) - if err != nil { - return "", "", err - } - return util.GenerateMutatePath(gvk), - util.GenerateValidatePath(gvk), - ctrl.NewWebhookManagedBy(mgr). - For(&v1alpha1.PoolService{}). - WithDefaulter(webhook). - WithValidator(webhook). - Complete() -} - -// +kubebuilder:webhook:path=/validate-net-openyurt-io-poolservice,mutating=false,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=net.openyurt.io,resources=poolservices,verbs=create;update,versions=v1alpha1,name=validate.net.v1alpha1.poolservice.openyurt.io -// +kubebuilder:webhook:path=/mutate-net-openyurt-io-poolservice,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=net.openyurt.io,resources=poolservices,verbs=create;update,versions=v1alpha1,name=mutate.net.v1alpha1.poolservice.openyurt.io - -// Cluster implements a validating and defaulting webhook for Cluster. -type PoolServiceHandler struct { - Client client.Client -} - -var _ webhook.CustomDefaulter = &PoolServiceHandler{} -var _ webhook.CustomValidator = &PoolServiceHandler{} diff --git a/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_validation.go b/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_validation.go deleted file mode 100644 index 7ff5c15aa03..00000000000 --- a/pkg/yurtmanager/webhook/poolservice/v1alpha1/poolservice_validation.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2024 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the License); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an AS IS BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - "context" - "fmt" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/klog/v2" - - "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1" -) - -// ValidateCreate implements webhook.CustomValidator so a webhook will be registered for the type. -func (webhook *PoolServiceHandler) ValidateCreate(ctx context.Context, obj runtime.Object) error { - ps, ok := obj.(*v1alpha1.PoolService) - if !ok { - return apierrors.NewBadRequest(fmt.Sprintf("expected a PoolService but got a %T", obj)) - } - - //validate - klog.Infof("need validate %+v", ps) - return nil -} - -// ValidateUpdate implements webhook.CustomValidator so a webhook will be registered for the type. -func (webhook *PoolServiceHandler) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error { - newps, ok := newObj.(*v1alpha1.PoolService) - if !ok { - return apierrors.NewBadRequest(fmt.Sprintf("expected a PoolService but got a %T", newObj)) - } - oldps, ok := oldObj.(*v1alpha1.PoolService) - if !ok { - return apierrors.NewBadRequest(fmt.Sprintf("expected a PoolService} but got a %T", oldObj)) - } - - // validate - klog.Infof("need validate newps %+v, oldps %+v", newps, oldps) - - return nil -} - -// ValidateDelete implements webhook.CustomValidator so a webhook will be registered for the type. -func (webhook *PoolServiceHandler) ValidateDelete(_ context.Context, obj runtime.Object) error { - ps, ok := obj.(*v1alpha1.PoolService) - if !ok { - return apierrors.NewBadRequest(fmt.Sprintf("expected a PoolService but got a %T", obj)) - } - - // validate - klog.Infof("need validate %+v", ps) - return nil -}