From 5a96bbfe1d637a61b722fad155353526815152cd Mon Sep 17 00:00:00 2001 From: dddddai Date: Sun, 13 Feb 2022 16:47:43 +0800 Subject: [PATCH 1/2] Revert "bugfix: resource binding is not created" This reverts commit c770cc10f39975dc25c3c9c1136aa1af8760050d. Signed-off-by: dddddai --- pkg/detector/detector.go | 140 +++++++++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 36 deletions(-) diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index e01e36f19781..971900bb2566 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -55,8 +55,14 @@ type ResourceDetector struct { // ResourceInterpreter knows the details of resource structure. ResourceInterpreter resourceinterpreter.ResourceInterpreter EventRecorder record.EventRecorder + // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and + // a reconcile function to consume the items in queue. + policyReconcileWorker util.AsyncWorker + propagationPolicyLister cache.GenericLister - propagationPolicyLister cache.GenericLister + // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and + // a reconcile function to consume the items in queue. + clusterPolicyReconcileWorker util.AsyncWorker clusterPropagationPolicyLister cache.GenericLister // bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and @@ -80,17 +86,30 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.waitingObjects = make(map[keys.ClusterWideKey]struct{}) d.stopCh = ctx.Done() + // setup policy reconcile worker + d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy) + d.policyReconcileWorker.Run(1, d.stopCh) + d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy) + d.clusterPolicyReconcileWorker.Run(1, d.stopCh) + + // watch and enqueue PropagationPolicy changes. propagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "propagationpolicies", } + policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete) + d.InformerManager.ForResource(propagationPolicyGVR, policyHandler) + d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) + + // watch and enqueue ClusterPropagationPolicy changes. clusterPropagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "clusterpropagationpolicies", } - d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) + clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete) + d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) // setup binding reconcile worker @@ -116,9 +135,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil) d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler) - d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) - d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler) - d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler) + d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) d.Processor.Run(1, d.stopCh) go d.discoverResources(30 * time.Second) @@ -129,10 +146,8 @@ func (d *ResourceDetector) Start(ctx context.Context) error { } // Check if our ResourceDetector implements necessary interfaces -var ( - _ manager.Runnable = &ResourceDetector{} - _ manager.LeaderElectionRunnable = &ResourceDetector{} -) +var _ manager.Runnable = &ResourceDetector{} +var _ manager.LeaderElectionRunnable = &ResourceDetector{} func (d *ResourceDetector) discoverResources(period time.Duration) { wait.Until(func() { @@ -190,20 +205,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { klog.Error("invalid key") return fmt.Errorf("invalid key") } - - if d.SkippedFromPropagating(clusterWideKey) { - if clusterWideKey.Group == policyv1alpha1.GroupName { - switch clusterWideKey.Kind { - case "PropagationPolicy": - return d.ReconcilePropagationPolicy(key) - case "ClusterPropagationPolicy": - return d.ReconcileClusterPropagationPolicy(key) - } - } - return nil - } - klog.Infof("Reconciling object: %s", clusterWideKey) + object, err := d.GetUnstructuredObject(clusterWideKey) if err != nil { if apierrors.IsNotFound(err) { @@ -251,7 +254,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { return nil } -// SkippedFromPropagating tells if an object should be propagated. +// EventFilter tells if an object should be take care of. // // All objects under Kubernetes reserved namespace should be ignored: // - kube-system @@ -264,8 +267,6 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // All objects which API group defined by Karmada should be ignored: // - cluster.karmada.io // - policy.karmada.io -// - work.karmada.io -// - config.karmada.io // // The api objects listed above will be ignored by default, as we don't want users to manually input the things // they don't care when trying to skip something else. @@ -274,32 +275,42 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // the specified apis will be ignored as well. // // If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored. -func (d *ResourceDetector) SkippedFromPropagating(clusterWideKey keys.ClusterWideKey) bool { +func (d *ResourceDetector) EventFilter(obj interface{}) bool { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return false + } + + clusterWideKey, ok := key.(keys.ClusterWideKey) + if !ok { + klog.Errorf("Invalid key") + return false + } + if names.IsReservedNamespace(clusterWideKey.Namespace) { - return true + return false } if d.SkippedResourceConfig != nil { if d.SkippedResourceConfig.GroupDisabled(clusterWideKey.Group) { - klog.V(4).Infof("Skip propagating %s", clusterWideKey.Group) - return true + klog.V(4).Infof("Skip event for %s", clusterWideKey.Group) + return false } if d.SkippedResourceConfig.GroupVersionDisabled(clusterWideKey.GroupVersion()) { - klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersion()) - return true + klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersion()) + return false } if d.SkippedResourceConfig.GroupVersionKindDisabled(clusterWideKey.GroupVersionKind()) { - klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersionKind()) - return true + klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersionKind()) + return false } } // if SkippedPropagatingNamespaces is set, skip object events in these namespaces. if _, ok := d.SkippedPropagatingNamespaces[clusterWideKey.Namespace]; ok { - klog.V(4).Infof("Skip propagating resources in %s", clusterWideKey.Namespace) - return true + return false } - return false + return true } // OnAdd handles object add event and push the object to queue. @@ -508,6 +519,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, } return nil }) + if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -537,6 +549,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Spec.Replicas = binding.Spec.Replicas return nil }) + if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -744,6 +757,33 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour return matchedResult } +// OnPropagationPolicyAdd handles object add event and push the object to queue. +func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Create PropagationPolicy(%s)", key) + d.policyReconcileWorker.Add(key) +} + +// OnPropagationPolicyUpdate handles object update event and push the object to queue. +func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) { + // currently do nothing, since a policy's resource selector can not be updated. +} + +// OnPropagationPolicyDelete handles object delete event and push the object to queue. +func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Delete PropagationPolicy(%s)", key) + d.policyReconcileWorker.Add(key) +} + // ReconcilePropagationPolicy handles PropagationPolicy resource changes. // When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -775,6 +815,33 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return d.HandlePropagationPolicyCreation(propagationObject) } +// OnClusterPropagationPolicyAdd handles object add event and push the object to queue. +func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key) + d.clusterPolicyReconcileWorker.Add(key) +} + +// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. +func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) { + // currently do nothing, since a policy's resource selector can not be updated. +} + +// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue. +func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key) + d.clusterPolicyReconcileWorker.Add(key) +} + // ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes. // When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -1011,6 +1078,7 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error { // OnClusterResourceBindingAdd handles object add event. func (d *ResourceDetector) OnClusterResourceBindingAdd(obj interface{}) { + } // OnClusterResourceBindingUpdate handles object update event and push the object to queue. From 323c4c07daa710ff17a0a751c00bc7abd6805cb0 Mon Sep 17 00:00:00 2001 From: dddddai Date: Tue, 15 Feb 2022 19:52:32 +0800 Subject: [PATCH 2/2] Rework "bugfix: resource binding not created occasionally" Signed-off-by: dddddai --- pkg/detector/detector.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 971900bb2566..ba3b0e13092b 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -146,8 +146,10 @@ func (d *ResourceDetector) Start(ctx context.Context) error { } // Check if our ResourceDetector implements necessary interfaces -var _ manager.Runnable = &ResourceDetector{} -var _ manager.LeaderElectionRunnable = &ResourceDetector{} +var ( + _ manager.Runnable = &ResourceDetector{} + _ manager.LeaderElectionRunnable = &ResourceDetector{} +) func (d *ResourceDetector) discoverResources(period time.Duration) { wait.Until(func() { @@ -233,7 +235,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", propagationPolicy.Namespace, propagationPolicy.Name) return fmt.Errorf("waiting for dependent overrides") } - + d.RemoveWaiting(clusterWideKey) return d.ApplyPolicy(object, clusterWideKey, propagationPolicy) } @@ -244,14 +246,20 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { return err } if clusterPolicy != nil { + d.RemoveWaiting(clusterWideKey) return d.ApplyClusterPolicy(object, clusterWideKey, clusterPolicy) } - d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource") - // reaching here mean there is no appropriate policy for the object, put it into waiting list. - d.AddWaiting(clusterWideKey) + if d.isWaiting(clusterWideKey) { + // reaching here means there is no appropriate policy for the object + d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource") + return nil + } - return nil + // put it into waiting list and retry once in case the resource and propagation policy come at the same time + // see https://github.com/karmada-io/karmada/issues/1195 + d.AddWaiting(clusterWideKey) + return fmt.Errorf("no matched propagation policy") } // EventFilter tells if an object should be take care of. @@ -549,7 +557,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Spec.Replicas = binding.Spec.Replicas return nil }) - if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -714,6 +721,14 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst return binding, nil } +// isWaiting indicates if the object is in waiting list. +func (d *ResourceDetector) isWaiting(objectKey keys.ClusterWideKey) bool { + d.waitingLock.RLock() + _, ok := d.waitingObjects[objectKey] + d.waitingLock.RUnlock() + return ok +} + // AddWaiting adds object's key to waiting list. func (d *ResourceDetector) AddWaiting(objectKey keys.ClusterWideKey) { d.waitingLock.Lock() @@ -1078,7 +1093,6 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error { // OnClusterResourceBindingAdd handles object add event. func (d *ResourceDetector) OnClusterResourceBindingAdd(obj interface{}) { - } // OnClusterResourceBindingUpdate handles object update event and push the object to queue.