Skip to content

Commit

Permalink
Add finalizer for propagation policy
Browse files Browse the repository at this point in the history
Signed-off-by: whitewindmills <[email protected]>
  • Loading branch information
whitewindmills committed Apr 16, 2024
1 parent fdad87e commit 1c706a6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
60 changes: 30 additions & 30 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
Version: policyv1alpha1.GroupVersion.Version,
Resource: "propagationpolicies",
}
policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete)
policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, nil)
d.InformerManager.ForResource(propagationPolicyGVR, policyHandler)
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)

Expand All @@ -148,7 +148,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
Version: policyv1alpha1.GroupVersion.Version,
Resource: "clusterpropagationpolicies",
}
clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete)
clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, nil)
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)

Expand Down Expand Up @@ -981,17 +981,6 @@ func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{})
}
}

// OnPropagationPolicyDelete handles object delete event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Delete PropagationPolicy(%s)", key)
d.policyReconcileWorker.Add(key)
}

// ReconcilePropagationPolicy handles PropagationPolicy resource changes.
// When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
// put the object to queue.
Expand All @@ -1007,19 +996,30 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
unstructuredObj, err := d.propagationPolicyLister.Get(ckey.NamespaceKey())
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("PropagationPolicy(%s) has been removed.", ckey.NamespaceKey())
return d.HandlePropagationPolicyDeletion(ckey.Namespace, ckey.Name)
return nil
}
klog.Errorf("Failed to get PropagationPolicy(%s): %v", ckey.NamespaceKey(), err)
return err
}

klog.Infof("PropagationPolicy(%s) has been added.", ckey.NamespaceKey())
propagationObject := &policyv1alpha1.PropagationPolicy{}
if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil {
klog.Errorf("Failed to convert PropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
return err
}

if !propagationObject.DeletionTimestamp.IsZero() {
klog.Infof("PropagationPolicy(%s) has been deleted.", ckey.NamespaceKey())
if err := d.HandlePropagationPolicyDeletion(propagationObject.Namespace, propagationObject.Name); err != nil {
return err
}
if controllerutil.RemoveFinalizer(propagationObject, util.PropagationPolicyControllerFinalizer) {
return d.Client.Update(context.TODO(), propagationObject)
}
return nil
}

klog.Infof("PropagationPolicy(%s) has been added.", ckey.NamespaceKey())
return d.HandlePropagationPolicyCreationOrUpdate(propagationObject)
}

Expand Down Expand Up @@ -1091,17 +1091,6 @@ func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj inter
}
}

// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.Add(key)
}

// ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes.
// When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
// put the object to queue.
Expand All @@ -1117,20 +1106,31 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey)
unstructuredObj, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey())
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("ClusterPropagationPolicy(%s) has been removed.", ckey.NamespaceKey())
return d.HandleClusterPropagationPolicyDeletion(ckey.Name)
return nil
}

klog.Errorf("Failed to get ClusterPropagationPolicy(%s): %v", ckey.NamespaceKey(), err)
return err
}

klog.Infof("Policy(%s) has been added", ckey.NamespaceKey())
propagationObject := &policyv1alpha1.ClusterPropagationPolicy{}
if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil {
klog.Errorf("Failed to convert ClusterPropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
return err
}

if !propagationObject.DeletionTimestamp.IsZero() {
klog.Infof("ClusterPropagationPolicy(%s) has been deleted.", ckey.NamespaceKey())
if err := d.HandleClusterPropagationPolicyDeletion(propagationObject.Name); err != nil {
return err
}
if controllerutil.RemoveFinalizer(propagationObject, util.ClusterPropagationPolicyControllerFinalizer) {
return d.Client.Update(context.TODO(), propagationObject)
}
return nil
}

klog.Infof("ClusterPropagationPolicy(%s) has been added", ckey.NamespaceKey())
return d.HandleClusterPropagationPolicyCreationOrUpdate(propagationObject)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ const (

// MCSControllerFinalizer is added to Cluster to ensure service work is deleted before itself is deleted.
MCSControllerFinalizer = "karmada.io/multiclusterservice-controller"

// PropagationPolicyControllerFinalizer is added to PropagationPolicy to ensure the related resources have been unbound before itself is deleted.
PropagationPolicyControllerFinalizer = "karmada.io/propagation-policy-controller"

// ClusterPropagationPolicyControllerFinalizer is added to ClusterPropagationPolicy to ensure the related resources have been unbound before itself is deleted.
ClusterPropagationPolicyControllerFinalizer = "karmada.io/cluster-propagation-policy-controller"
)

const (
Expand Down
4 changes: 4 additions & 0 deletions pkg/webhook/clusterpropagationpolicy/mutating.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"fmt"
"net/http"

"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/validation"
)
Expand Down Expand Up @@ -80,6 +82,8 @@ func (a *MutatingAdmission) Handle(_ context.Context, req admission.Request) adm
}
}

controllerutil.AddFinalizer(policy, util.ClusterPropagationPolicyControllerFinalizer)

marshaledBytes, err := json.Marshal(policy)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/webhook/propagationpolicy/mutating.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"net/http"

"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/validation"
)
Expand Down Expand Up @@ -92,6 +94,8 @@ func (a *MutatingAdmission) Handle(_ context.Context, req admission.Request) adm
}
}

controllerutil.AddFinalizer(policy, util.PropagationPolicyControllerFinalizer)

marshaledBytes, err := json.Marshal(policy)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
Expand Down

0 comments on commit 1c706a6

Please sign in to comment.