diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 3bb2ac971f41..f5cf4a525aea 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -54,14 +54,8 @@ type ResourceDetector struct { // ResourceInterpreter knows the details of resource structure. ResourceInterpreter resourceinterpreter.ResourceInterpreter EventRecorder record.EventRecorder - // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and - // a reconcile function to consume the items in queue. - policyReconcileWorker util.AsyncWorker - propagationPolicyLister cache.GenericLister - // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and - // a reconcile function to consume the items in queue. - clusterPolicyReconcileWorker util.AsyncWorker + propagationPolicyLister cache.GenericLister clusterPropagationPolicyLister cache.GenericLister // bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and @@ -85,30 +79,17 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.waitingObjects = make(map[keys.ClusterWideKey]struct{}) d.stopCh = ctx.Done() - // setup policy reconcile worker - d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy) - d.policyReconcileWorker.Run(1, d.stopCh) - d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy) - d.clusterPolicyReconcileWorker.Run(1, d.stopCh) - - // watch and enqueue PropagationPolicy changes. propagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "propagationpolicies", } - policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete) - d.InformerManager.ForResource(propagationPolicyGVR, policyHandler) - d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) - - // watch and enqueue ClusterPropagationPolicy changes. clusterPropagationPolicyGVR := schema.GroupVersionResource{ Group: policyv1alpha1.GroupVersion.Group, Version: policyv1alpha1.GroupVersion.Version, Resource: "clusterpropagationpolicies", } - clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete) - d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) + d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) // setup binding reconcile worker @@ -134,7 +115,9 @@ func (d *ResourceDetector) Start(ctx context.Context) error { clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil) d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler) - d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) + d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) + d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler) + d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler) d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) d.Processor.Run(1, d.stopCh) go d.discoverResources(30 * time.Second) @@ -145,8 +128,10 @@ func (d *ResourceDetector) Start(ctx context.Context) error { } // Check if our ResourceDetector implements necessary interfaces -var _ manager.Runnable = &ResourceDetector{} -var _ manager.LeaderElectionRunnable = &ResourceDetector{} +var ( + _ manager.Runnable = &ResourceDetector{} + _ manager.LeaderElectionRunnable = &ResourceDetector{} +) func (d *ResourceDetector) discoverResources(period time.Duration) { wait.Until(func() { @@ -206,6 +191,19 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { } klog.Infof("Reconciling object: %s", clusterWideKey) + if clusterWideKey.Group == policyv1alpha1.GroupName { + switch clusterWideKey.Kind { + case "PropagationPolicy": + return d.ReconcilePropagationPolicy(key) + case "ClusterPropagationPolicy": + return d.ReconcileClusterPropagationPolicy(key) + } + } + + if !d.EventFilter(clusterWideKey) { + return nil + } + object, err := d.GetUnstructuredObject(clusterWideKey) if err != nil { if apierrors.IsNotFound(err) { @@ -266,6 +264,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // All objects which API group defined by Karmada should be ignored: // - cluster.karmada.io // - policy.karmada.io +// - work.karmada.io +// - config.karmada.io // // The api objects listed above will be ignored by default, as we don't want users to manually input the things // they don't care when trying to skip something else. @@ -274,18 +274,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error { // the specified apis will be ignored as well. // // If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored. -func (d *ResourceDetector) EventFilter(obj interface{}) bool { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return false - } - - clusterWideKey, ok := key.(keys.ClusterWideKey) - if !ok { - klog.Errorf("Invalid key") - return false - } - +func (d *ResourceDetector) EventFilter(clusterWideKey keys.ClusterWideKey) bool { if names.IsReservedNamespace(clusterWideKey.Namespace) { return false } @@ -323,7 +312,15 @@ func (d *ResourceDetector) OnAdd(obj interface{}) { // OnUpdate handles object update event and push the object to queue. func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) { - d.OnAdd(newObj) + switch newObj.(type) { + case *policyv1alpha1.PropagationPolicy: + // currently do nothing, since a policy's resource selector can not be updated. + return + case *policyv1alpha1.ClusterPropagationPolicy: + return + default: + d.OnAdd(newObj) + } } // OnDelete handles object delete event and push the object to queue. @@ -504,7 +501,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Spec.Replicas = binding.Spec.Replicas return nil }) - if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -534,7 +530,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Spec.Replicas = binding.Spec.Replicas return nil }) - if err != nil { klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err) return err @@ -742,33 +737,6 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour return matchedResult } -// OnPropagationPolicyAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Create PropagationPolicy(%s)", key) - d.policyReconcileWorker.Add(key) -} - -// OnPropagationPolicyUpdate handles object update event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) { - // currently do nothing, since a policy's resource selector can not be updated. -} - -// OnPropagationPolicyDelete handles object delete event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Delete PropagationPolicy(%s)", key) - d.policyReconcileWorker.Add(key) -} - // ReconcilePropagationPolicy handles PropagationPolicy resource changes. // When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -800,33 +768,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return d.HandlePropagationPolicyCreation(propagationObject) } -// OnClusterPropagationPolicyAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key) - d.clusterPolicyReconcileWorker.Add(key) -} - -// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) { - // currently do nothing, since a policy's resource selector can not be updated. -} - -// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key) - d.clusterPolicyReconcileWorker.Add(key) -} - // ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes. // When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and // put the object to queue. @@ -1063,7 +1004,6 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error { // OnClusterResourceBindingAdd handles object add event. func (d *ResourceDetector) OnClusterResourceBindingAdd(obj interface{}) { - } // OnClusterResourceBindingUpdate handles object update event and push the object to queue.