From 1c706a67fbc6e91d6aef8f08fb08bd8c4a7a74a1 Mon Sep 17 00:00:00 2001 From: whitewindmills Date: Tue, 16 Apr 2024 17:46:55 +0800 Subject: [PATCH] Add finalizer for propagation policy Signed-off-by: whitewindmills --- pkg/detector/detector.go | 60 +++++++++---------- pkg/util/constants.go | 6 ++ .../clusterpropagationpolicy/mutating.go | 4 ++ pkg/webhook/propagationpolicy/mutating.go | 4 ++ 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 0b0f4b9bfd4e..ae8bce47156b 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -138,7 +138,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { Version: policyv1alpha1.GroupVersion.Version, Resource: "propagationpolicies", } - policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete) + policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, nil) d.InformerManager.ForResource(propagationPolicyGVR, policyHandler) d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) @@ -148,7 +148,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { Version: policyv1alpha1.GroupVersion.Version, Resource: "clusterpropagationpolicies", } - clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete) + clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, nil) d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) @@ -981,17 +981,6 @@ func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) } } -// OnPropagationPolicyDelete handles object delete event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Delete PropagationPolicy(%s)", key) - d.policyReconcileWorker.Add(key) -} - // ReconcilePropagationPolicy handles PropagationPolicy resource changes. // When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -1007,19 +996,30 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { unstructuredObj, err := d.propagationPolicyLister.Get(ckey.NamespaceKey()) if err != nil { if apierrors.IsNotFound(err) { - klog.Infof("PropagationPolicy(%s) has been removed.", ckey.NamespaceKey()) - return d.HandlePropagationPolicyDeletion(ckey.Namespace, ckey.Name) + return nil } klog.Errorf("Failed to get PropagationPolicy(%s): %v", ckey.NamespaceKey(), err) return err } - klog.Infof("PropagationPolicy(%s) has been added.", ckey.NamespaceKey()) propagationObject := &policyv1alpha1.PropagationPolicy{} if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil { klog.Errorf("Failed to convert PropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err) return err } + + if !propagationObject.DeletionTimestamp.IsZero() { + klog.Infof("PropagationPolicy(%s) has been deleted.", ckey.NamespaceKey()) + if err := d.HandlePropagationPolicyDeletion(propagationObject.Namespace, propagationObject.Name); err != nil { + return err + } + if controllerutil.RemoveFinalizer(propagationObject, util.PropagationPolicyControllerFinalizer) { + return d.Client.Update(context.TODO(), propagationObject) + } + return nil + } + + klog.Infof("PropagationPolicy(%s) has been added.", ckey.NamespaceKey()) return d.HandlePropagationPolicyCreationOrUpdate(propagationObject) } @@ -1091,17 +1091,6 @@ func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj inter } } -// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key) - d.clusterPolicyReconcileWorker.Add(key) -} - // ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes. // When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -1117,20 +1106,31 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey) unstructuredObj, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey()) if err != nil { if apierrors.IsNotFound(err) { - klog.Infof("ClusterPropagationPolicy(%s) has been removed.", ckey.NamespaceKey()) - return d.HandleClusterPropagationPolicyDeletion(ckey.Name) + return nil } klog.Errorf("Failed to get ClusterPropagationPolicy(%s): %v", ckey.NamespaceKey(), err) return err } - klog.Infof("Policy(%s) has been added", ckey.NamespaceKey()) propagationObject := &policyv1alpha1.ClusterPropagationPolicy{} if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil { klog.Errorf("Failed to convert ClusterPropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err) return err } + + if !propagationObject.DeletionTimestamp.IsZero() { + klog.Infof("ClusterPropagationPolicy(%s) has been deleted.", ckey.NamespaceKey()) + if err := d.HandleClusterPropagationPolicyDeletion(propagationObject.Name); err != nil { + return err + } + if controllerutil.RemoveFinalizer(propagationObject, util.ClusterPropagationPolicyControllerFinalizer) { + return d.Client.Update(context.TODO(), propagationObject) + } + return nil + } + + klog.Infof("ClusterPropagationPolicy(%s) has been added", ckey.NamespaceKey()) return d.HandleClusterPropagationPolicyCreationOrUpdate(propagationObject) } diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 4f6c84a55f26..1dbe0cfd25b1 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -132,6 +132,12 @@ const ( // MCSControllerFinalizer is added to Cluster to ensure service work is deleted before itself is deleted. MCSControllerFinalizer = "karmada.io/multiclusterservice-controller" + + // PropagationPolicyControllerFinalizer is added to PropagationPolicy to ensure the related resources have been unbound before itself is deleted. + PropagationPolicyControllerFinalizer = "karmada.io/propagation-policy-controller" + + // ClusterPropagationPolicyControllerFinalizer is added to ClusterPropagationPolicy to ensure the related resources have been unbound before itself is deleted. + ClusterPropagationPolicyControllerFinalizer = "karmada.io/cluster-propagation-policy-controller" ) const ( diff --git a/pkg/webhook/clusterpropagationpolicy/mutating.go b/pkg/webhook/clusterpropagationpolicy/mutating.go index 20d1933a657f..0298ab5c4d76 100644 --- a/pkg/webhook/clusterpropagationpolicy/mutating.go +++ b/pkg/webhook/clusterpropagationpolicy/mutating.go @@ -22,9 +22,11 @@ import ( "fmt" "net/http" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/validation" ) @@ -80,6 +82,8 @@ func (a *MutatingAdmission) Handle(_ context.Context, req admission.Request) adm } } + controllerutil.AddFinalizer(policy, util.ClusterPropagationPolicyControllerFinalizer) + marshaledBytes, err := json.Marshal(policy) if err != nil { return admission.Errored(http.StatusInternalServerError, err) diff --git a/pkg/webhook/propagationpolicy/mutating.go b/pkg/webhook/propagationpolicy/mutating.go index 7bd5879a63a3..258a79c33327 100644 --- a/pkg/webhook/propagationpolicy/mutating.go +++ b/pkg/webhook/propagationpolicy/mutating.go @@ -23,9 +23,11 @@ import ( "net/http" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/validation" ) @@ -92,6 +94,8 @@ func (a *MutatingAdmission) Handle(_ context.Context, req admission.Request) adm } } + controllerutil.AddFinalizer(policy, util.PropagationPolicyControllerFinalizer) + marshaledBytes, err := json.Marshal(policy) if err != nil { return admission.Errored(http.StatusInternalServerError, err)