diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index e01e36f19781..ba3b0e13092b 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -55,8 +55,14 @@ type ResourceDetector struct { // ResourceInterpreter knows the details of resource structure. ResourceInterpreter resourceinterpreter.ResourceInterpreter EventRecorder record.EventRecorder + // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and + // a reconcile function to consume the items in queue. + policyReconcileWorker util.AsyncWorker + propagationPolicyLister cache.GenericLister - propagationPolicyLister cache.GenericLister + // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and + // a reconcile function to consume the items in queue. + clusterPolicyReconcileWorker util.AsyncWorker clusterPropagationPolicyLister cache.GenericLister // bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and @@ -80,17 +86,30 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.waitingObjects = make(map[keys.ClusterWideKey]struct{}) d.stopCh = ctx.Done() + // setup policy reconcile worker + d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy) + d.policyReconcileWorker.Run(1, d.stopCh) + d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy) + d.clusterPolicyReconcileWorker.Run(1, d.stopCh) + + // watch and enqueue PropagationPolicy changes. propagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "propagationpolicies", } + policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete) + d.InformerManager.ForResource(propagationPolicyGVR, policyHandler) + d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) + + // watch and enqueue ClusterPropagationPolicy changes. clusterPropagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "clusterpropagationpolicies", } - d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) + clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete) + d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) // setup binding reconcile worker @@ -116,9 +135,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil) d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler) - d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) - d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler) - d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler) + d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) d.Processor.Run(1, d.stopCh) go d.discoverResources(30 * time.Second) @@ -190,20 +207,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { klog.Error("invalid key") return fmt.Errorf("invalid key") } - - if d.SkippedFromPropagating(clusterWideKey) { - if clusterWideKey.Group == policyv1alpha1.GroupName { - switch clusterWideKey.Kind { - case "PropagationPolicy": - return d.ReconcilePropagationPolicy(key) - case "ClusterPropagationPolicy": - return d.ReconcileClusterPropagationPolicy(key) - } - } - return nil - } - klog.Infof("Reconciling object: %s", clusterWideKey) + object, err := d.GetUnstructuredObject(clusterWideKey) if err != nil { if apierrors.IsNotFound(err) { @@ -230,7 +235,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", propagationPolicy.Namespace, propagationPolicy.Name) return fmt.Errorf("waiting for dependent overrides") } - + d.RemoveWaiting(clusterWideKey) return d.ApplyPolicy(object, clusterWideKey, propagationPolicy) } @@ -241,17 +246,23 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { return err } if clusterPolicy != nil { + d.RemoveWaiting(clusterWideKey) return d.ApplyClusterPolicy(object, clusterWideKey, clusterPolicy) } - d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource") - // reaching here mean there is no appropriate policy for the object, put it into waiting list. - d.AddWaiting(clusterWideKey) + if d.isWaiting(clusterWideKey) { + // reaching here means there is no appropriate policy for the object + d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource") + return nil + } - return nil + // put it into waiting list and retry once in case the resource and propagation policy come at the same time + // see https://github.com/karmada-io/karmada/issues/1195 + d.AddWaiting(clusterWideKey) + return fmt.Errorf("no matched propagation policy") } -// SkippedFromPropagating tells if an object should be propagated. +// EventFilter tells if an object should be take care of. // // All objects under Kubernetes reserved namespace should be ignored: // - kube-system @@ -264,8 +275,6 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // All objects which API group defined by Karmada should be ignored: // - cluster.karmada.io // - policy.karmada.io -// - work.karmada.io -// - config.karmada.io // // The api objects listed above will be ignored by default, as we don't want users to manually input the things // they don't care when trying to skip something else. @@ -274,32 +283,42 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // the specified apis will be ignored as well. // // If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored. -func (d *ResourceDetector) SkippedFromPropagating(clusterWideKey keys.ClusterWideKey) bool { +func (d *ResourceDetector) EventFilter(obj interface{}) bool { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return false + } + + clusterWideKey, ok := key.(keys.ClusterWideKey) + if !ok { + klog.Errorf("Invalid key") + return false + } + if names.IsReservedNamespace(clusterWideKey.Namespace) { - return true + return false } if d.SkippedResourceConfig != nil { if d.SkippedResourceConfig.GroupDisabled(clusterWideKey.Group) { - klog.V(4).Infof("Skip propagating %s", clusterWideKey.Group) - return true + klog.V(4).Infof("Skip event for %s", clusterWideKey.Group) + return false } if d.SkippedResourceConfig.GroupVersionDisabled(clusterWideKey.GroupVersion()) { - klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersion()) - return true + klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersion()) + return false } if d.SkippedResourceConfig.GroupVersionKindDisabled(clusterWideKey.GroupVersionKind()) { - klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersionKind()) - return true + klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersionKind()) + return false } } // if SkippedPropagatingNamespaces is set, skip object events in these namespaces. if _, ok := d.SkippedPropagatingNamespaces[clusterWideKey.Namespace]; ok { - klog.V(4).Infof("Skip propagating resources in %s", clusterWideKey.Namespace) - return true + return false } - return false + return true } // OnAdd handles object add event and push the object to queue. @@ -508,6 +527,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, } return nil }) + if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -701,6 +721,14 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst return binding, nil } +// isWaiting indicates if the object is in waiting list. +func (d *ResourceDetector) isWaiting(objectKey keys.ClusterWideKey) bool { + d.waitingLock.RLock() + _, ok := d.waitingObjects[objectKey] + d.waitingLock.RUnlock() + return ok +} + // AddWaiting adds object's key to waiting list. func (d *ResourceDetector) AddWaiting(objectKey keys.ClusterWideKey) { d.waitingLock.Lock() @@ -744,6 +772,33 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour return matchedResult } +// OnPropagationPolicyAdd handles object add event and push the object to queue. +func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Create PropagationPolicy(%s)", key) + d.policyReconcileWorker.Add(key) +} + +// OnPropagationPolicyUpdate handles object update event and push the object to queue. +func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) { + // currently do nothing, since a policy's resource selector can not be updated. +} + +// OnPropagationPolicyDelete handles object delete event and push the object to queue. +func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Delete PropagationPolicy(%s)", key) + d.policyReconcileWorker.Add(key) +} + // ReconcilePropagationPolicy handles PropagationPolicy resource changes. // When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -775,6 +830,33 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return d.HandlePropagationPolicyCreation(propagationObject) } +// OnClusterPropagationPolicyAdd handles object add event and push the object to queue. +func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key) + d.clusterPolicyReconcileWorker.Add(key) +} + +// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. +func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) { + // currently do nothing, since a policy's resource selector can not be updated. +} + +// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue. +func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) { + key, err := ClusterWideKeyFunc(obj) + if err != nil { + return + } + + klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key) + d.clusterPolicyReconcileWorker.Add(key) +} + // ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes. // When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue.