From c770cc10f39975dc25c3c9c1136aa1af8760050d Mon Sep 17 00:00:00 2001 From: dddddai Date: Sat, 22 Jan 2022 08:36:50 +0800 Subject: [PATCH] bugfix: resource binding is not created reconcile PP/CPPs and resource templates in one goroutine Signed-off-by: dddddai --- pkg/detector/detector.go | 140 ++++++++++----------------------------- 1 file changed, 36 insertions(+), 104 deletions(-) diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 3bb2ac971f41..85f5cd37ae85 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -54,14 +54,8 @@ type ResourceDetector struct { // ResourceInterpreter knows the details of resource structure. ResourceInterpreter resourceinterpreter.ResourceInterpreter EventRecorder record.EventRecorder - // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and - // a reconcile function to consume the items in queue. - policyReconcileWorker util.AsyncWorker - propagationPolicyLister cache.GenericLister - // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and - // a reconcile function to consume the items in queue. - clusterPolicyReconcileWorker util.AsyncWorker + propagationPolicyLister cache.GenericLister clusterPropagationPolicyLister cache.GenericLister // bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and @@ -85,30 +79,17 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.waitingObjects = make(map[keys.ClusterWideKey]struct{}) d.stopCh = ctx.Done() - // setup policy reconcile worker - d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy) - d.policyReconcileWorker.Run(1, d.stopCh) - d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy) - d.clusterPolicyReconcileWorker.Run(1, d.stopCh) - - // watch and enqueue PropagationPolicy changes. propagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "propagationpolicies", } - policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete) - d.InformerManager.ForResource(propagationPolicyGVR, policyHandler) - d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) - - // watch and enqueue ClusterPropagationPolicy changes. clusterPropagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "clusterpropagationpolicies", } - clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete) - d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) + d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) // setup binding reconcile worker @@ -134,7 +115,9 @@ func (d *ResourceDetector) Start(ctx context.Context) error { clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil) d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler) - d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) + d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) + d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler) + d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler) d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) d.Processor.Run(1, d.stopCh) go d.discoverResources(30 * time.Second) @@ -145,8 +128,10 @@ func (d *ResourceDetector) Start(ctx context.Context) error { } // Check if our ResourceDetector implements necessary interfaces -var _ manager.Runnable = &ResourceDetector{} -var _ manager.LeaderElectionRunnable = &ResourceDetector{} +var ( + _ manager.Runnable = &ResourceDetector{} + _ manager.LeaderElectionRunnable = &ResourceDetector{} +) func (d *ResourceDetector) discoverResources(period time.Duration) { wait.Until(func() { @@ -204,8 +189,20 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { klog.Error("invalid key") return fmt.Errorf("invalid key") } - klog.Infof("Reconciling object: %s", clusterWideKey) + if d.SkippedFromPropagating(clusterWideKey) { + if clusterWideKey.Group == policyv1alpha1.GroupName { + switch clusterWideKey.Kind { + case "PropagationPolicy": + return d.ReconcilePropagationPolicy(key) + case "ClusterPropagationPolicy": + return d.ReconcileClusterPropagationPolicy(key) + } + } + return nil + } + + klog.Infof("Reconciling object: %s", clusterWideKey) object, err := d.GetUnstructuredObject(clusterWideKey) if err != nil { if apierrors.IsNotFound(err) { @@ -253,7 +250,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { return nil } -// EventFilter tells if an object should be take care of. +// SkippedFromPropagating tells if an object should be propagated. // // All objects under Kubernetes reserved namespace should be ignored: // - kube-system @@ -266,6 +263,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // All objects which API group defined by Karmada should be ignored: // - cluster.karmada.io // - policy.karmada.io +// - work.karmada.io +// - config.karmada.io // // The api objects listed above will be ignored by default, as we don't want users to manually input the things // they don't care when trying to skip something else. @@ -274,42 +273,32 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // the specified apis will be ignored as well. // // If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored. -func (d *ResourceDetector) EventFilter(obj interface{}) bool { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return false - } - - clusterWideKey, ok := key.(keys.ClusterWideKey) - if !ok { - klog.Errorf("Invalid key") - return false - } - +func (d *ResourceDetector) SkippedFromPropagating(clusterWideKey keys.ClusterWideKey) bool { if names.IsReservedNamespace(clusterWideKey.Namespace) { - return false + return true } if d.SkippedResourceConfig != nil { if d.SkippedResourceConfig.GroupDisabled(clusterWideKey.Group) { - klog.V(4).Infof("Skip event for %s", clusterWideKey.Group) - return false + klog.V(4).Infof("Skip propagating %s", clusterWideKey.Group) + return true } if d.SkippedResourceConfig.GroupVersionDisabled(clusterWideKey.GroupVersion()) { - klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersion()) - return false + klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersion()) + return true } if d.SkippedResourceConfig.GroupVersionKindDisabled(clusterWideKey.GroupVersionKind()) { - klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersionKind()) - return false + klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersionKind()) + return true } } // if SkippedPropagatingNamespaces is set, skip object events in these namespaces. if _, ok := d.SkippedPropagatingNamespaces[clusterWideKey.Namespace]; ok { - return false + klog.V(4).Infof("Skip propagating resources in %s", clusterWideKey.Namespace) + return true } - return true + return false } // OnAdd handles object add event and push the object to queue. @@ -504,7 +493,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Spec.Replicas = binding.Spec.Replicas return nil }) - if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -534,7 +522,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Spec.Replicas = binding.Spec.Replicas return nil }) - if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -742,33 +729,6 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour return matchedResult } -// OnPropagationPolicyAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Create PropagationPolicy(%s)", key) - d.policyReconcileWorker.Add(key) -} - -// OnPropagationPolicyUpdate handles object update event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) { - // currently do nothing, since a policy's resource selector can not be updated. -} - -// OnPropagationPolicyDelete handles object delete event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Delete PropagationPolicy(%s)", key) - d.policyReconcileWorker.Add(key) -} - // ReconcilePropagationPolicy handles PropagationPolicy resource changes. // When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -800,33 +760,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return d.HandlePropagationPolicyCreation(propagationObject) } -// OnClusterPropagationPolicyAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key) - d.clusterPolicyReconcileWorker.Add(key) -} - -// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) { - // currently do nothing, since a policy's resource selector can not be updated. -} - -// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key) - d.clusterPolicyReconcileWorker.Add(key) -} - // ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes. // When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -1063,7 +996,6 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error { // OnClusterResourceBindingAdd handles object add event. func (d *ResourceDetector) OnClusterResourceBindingAdd(obj interface{}) { - } // OnClusterResourceBindingUpdate handles object update event and push the object to queue.