Skip to content

Commit

Permalink
bugfix: resource binding is not created
Browse files Browse the repository at this point in the history
reconcile PP/CPPs and resource templates in one goroutine

Signed-off-by: dddddai <[email protected]>
  • Loading branch information
dddddai committed Jan 22, 2022
1 parent 19a5fc5 commit 19694c3
Showing 1 changed file with 34 additions and 94 deletions.
128 changes: 34 additions & 94 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,8 @@ type ResourceDetector struct {
// ResourceInterpreter knows the details of resource structure.
ResourceInterpreter resourceinterpreter.ResourceInterpreter
EventRecorder record.EventRecorder
// policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
// a reconcile function to consume the items in queue.
policyReconcileWorker util.AsyncWorker
propagationPolicyLister cache.GenericLister

// clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and
// a reconcile function to consume the items in queue.
clusterPolicyReconcileWorker util.AsyncWorker
propagationPolicyLister cache.GenericLister
clusterPropagationPolicyLister cache.GenericLister

// bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and
Expand All @@ -85,30 +79,17 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.waitingObjects = make(map[keys.ClusterWideKey]struct{})
d.stopCh = ctx.Done()

// setup policy reconcile worker
d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy)
d.policyReconcileWorker.Run(1, d.stopCh)
d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy)
d.clusterPolicyReconcileWorker.Run(1, d.stopCh)

// watch and enqueue PropagationPolicy changes.
propagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: "propagationpolicies",
}
policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete)
d.InformerManager.ForResource(propagationPolicyGVR, policyHandler)
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)

// watch and enqueue ClusterPropagationPolicy changes.
clusterPropagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: "clusterpropagationpolicies",
}
clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete)
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)

// setup binding reconcile worker
Expand All @@ -134,7 +115,9 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil)
d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler)

d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler)
d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler)
d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile)
d.Processor.Run(1, d.stopCh)
go d.discoverResources(30 * time.Second)
Expand All @@ -145,8 +128,10 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
}

// Check if our ResourceDetector implements necessary interfaces
var _ manager.Runnable = &ResourceDetector{}
var _ manager.LeaderElectionRunnable = &ResourceDetector{}
var (
_ manager.Runnable = &ResourceDetector{}
_ manager.LeaderElectionRunnable = &ResourceDetector{}
)

func (d *ResourceDetector) discoverResources(period time.Duration) {
wait.Until(func() {
Expand Down Expand Up @@ -206,6 +191,19 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
}
klog.Infof("Reconciling object: %s", clusterWideKey)

if clusterWideKey.Group == policyv1alpha1.GroupName {
switch clusterWideKey.Kind {
case "PropagationPolicy":
return d.ReconcilePropagationPolicy(key)
case "ClusterPropagationPolicy":
return d.ReconcileClusterPropagationPolicy(key)
}
}

if !d.EventFilter(clusterWideKey) {
return nil
}

object, err := d.GetUnstructuredObject(clusterWideKey)
if err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -266,6 +264,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
// All objects which API group defined by Karmada should be ignored:
// - cluster.karmada.io
// - policy.karmada.io
// - work.karmada.io
// - config.karmada.io
//
// The api objects listed above will be ignored by default, as we don't want users to manually input the things
// they don't care when trying to skip something else.
Expand All @@ -274,18 +274,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
// the specified apis will be ignored as well.
//
// If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored.
func (d *ResourceDetector) EventFilter(obj interface{}) bool {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return false
}

clusterWideKey, ok := key.(keys.ClusterWideKey)
if !ok {
klog.Errorf("Invalid key")
return false
}

func (d *ResourceDetector) EventFilter(clusterWideKey keys.ClusterWideKey) bool {
if names.IsReservedNamespace(clusterWideKey.Namespace) {
return false
}
Expand Down Expand Up @@ -323,7 +312,15 @@ func (d *ResourceDetector) OnAdd(obj interface{}) {

// OnUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) {
d.OnAdd(newObj)
switch newObj.(type) {
case *policyv1alpha1.PropagationPolicy:
// currently do nothing, since a policy's resource selector can not be updated.
return
case *policyv1alpha1.ClusterPropagationPolicy:
return
default:
d.OnAdd(newObj)
}
}

// OnDelete handles object delete event and push the object to queue.
Expand Down Expand Up @@ -504,7 +501,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Replicas = binding.Spec.Replicas
return nil
})

if err != nil {
klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err)
return err
Expand Down Expand Up @@ -534,7 +530,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Replicas = binding.Spec.Replicas
return nil
})

if err != nil {
klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err)
return err
Expand Down Expand Up @@ -742,33 +737,6 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour
return matchedResult
}

// OnPropagationPolicyAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Create PropagationPolicy(%s)", key)
d.policyReconcileWorker.Add(key)
}

// OnPropagationPolicyUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) {
// currently do nothing, since a policy's resource selector can not be updated.
}

// OnPropagationPolicyDelete handles object delete event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Delete PropagationPolicy(%s)", key)
d.policyReconcileWorker.Add(key)
}

// ReconcilePropagationPolicy handles PropagationPolicy resource changes.
// When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
// put the object to queue.
Expand Down Expand Up @@ -800,33 +768,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
return d.HandlePropagationPolicyCreation(propagationObject)
}

// OnClusterPropagationPolicyAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.Add(key)
}

// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) {
// currently do nothing, since a policy's resource selector can not be updated.
}

// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.Add(key)
}

// ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes.
// When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
// put the object to queue.
Expand Down Expand Up @@ -1063,7 +1004,6 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error {

// OnClusterResourceBindingAdd handles object add event.
func (d *ResourceDetector) OnClusterResourceBindingAdd(obj interface{}) {

}

// OnClusterResourceBindingUpdate handles object update event and push the object to queue.
Expand Down

0 comments on commit 19694c3

Please sign in to comment.