Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework "bugfix: resource binding not created occasionally" #1368

Merged
merged 2 commits into from
Feb 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 119 additions & 37 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ type ResourceDetector struct {
// ResourceInterpreter knows the details of resource structure.
ResourceInterpreter resourceinterpreter.ResourceInterpreter
EventRecorder record.EventRecorder
// policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
// a reconcile function to consume the items in queue.
policyReconcileWorker util.AsyncWorker
propagationPolicyLister cache.GenericLister

propagationPolicyLister cache.GenericLister
// clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and
// a reconcile function to consume the items in queue.
clusterPolicyReconcileWorker util.AsyncWorker
clusterPropagationPolicyLister cache.GenericLister

// bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and
Expand All @@ -80,17 +86,30 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.waitingObjects = make(map[keys.ClusterWideKey]struct{})
d.stopCh = ctx.Done()

// setup policy reconcile worker
d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy)
d.policyReconcileWorker.Run(1, d.stopCh)
d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy)
d.clusterPolicyReconcileWorker.Run(1, d.stopCh)

// watch and enqueue PropagationPolicy changes.
propagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: "propagationpolicies",
}
policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete)
d.InformerManager.ForResource(propagationPolicyGVR, policyHandler)
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)

// watch and enqueue ClusterPropagationPolicy changes.
clusterPropagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: "clusterpropagationpolicies",
}
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)
clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete)
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)

// setup binding reconcile worker
Expand All @@ -116,9 +135,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil)
d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler)

d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler)
d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler)
d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile)
d.Processor.Run(1, d.stopCh)
go d.discoverResources(30 * time.Second)
Expand Down Expand Up @@ -190,20 +207,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
klog.Error("invalid key")
return fmt.Errorf("invalid key")
}

if d.SkippedFromPropagating(clusterWideKey) {
if clusterWideKey.Group == policyv1alpha1.GroupName {
switch clusterWideKey.Kind {
case "PropagationPolicy":
return d.ReconcilePropagationPolicy(key)
case "ClusterPropagationPolicy":
return d.ReconcileClusterPropagationPolicy(key)
}
}
return nil
}

klog.Infof("Reconciling object: %s", clusterWideKey)

object, err := d.GetUnstructuredObject(clusterWideKey)
if err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -230,7 +235,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", propagationPolicy.Namespace, propagationPolicy.Name)
return fmt.Errorf("waiting for dependent overrides")
}

d.RemoveWaiting(clusterWideKey)
return d.ApplyPolicy(object, clusterWideKey, propagationPolicy)
}

Expand All @@ -241,17 +246,23 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
return err
}
if clusterPolicy != nil {
d.RemoveWaiting(clusterWideKey)
return d.ApplyClusterPolicy(object, clusterWideKey, clusterPolicy)
}

d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource")
// reaching here mean there is no appropriate policy for the object, put it into waiting list.
d.AddWaiting(clusterWideKey)
if d.isWaiting(clusterWideKey) {
// reaching here means there is no appropriate policy for the object
d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource")
return nil
}

return nil
// put it into waiting list and retry once in case the resource and propagation policy come at the same time
// see https://github.com/karmada-io/karmada/issues/1195
d.AddWaiting(clusterWideKey)
return fmt.Errorf("no matched propagation policy")
}

// SkippedFromPropagating tells if an object should be propagated.
// EventFilter tells if an object should be take care of.
//
// All objects under Kubernetes reserved namespace should be ignored:
// - kube-system
Expand All @@ -264,8 +275,6 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
// All objects which API group defined by Karmada should be ignored:
// - cluster.karmada.io
// - policy.karmada.io
// - work.karmada.io
// - config.karmada.io
//
// The api objects listed above will be ignored by default, as we don't want users to manually input the things
// they don't care when trying to skip something else.
Expand All @@ -274,32 +283,42 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
// the specified apis will be ignored as well.
//
// If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored.
func (d *ResourceDetector) SkippedFromPropagating(clusterWideKey keys.ClusterWideKey) bool {
func (d *ResourceDetector) EventFilter(obj interface{}) bool {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return false
}

clusterWideKey, ok := key.(keys.ClusterWideKey)
if !ok {
klog.Errorf("Invalid key")
return false
}

if names.IsReservedNamespace(clusterWideKey.Namespace) {
return true
return false
}

if d.SkippedResourceConfig != nil {
if d.SkippedResourceConfig.GroupDisabled(clusterWideKey.Group) {
klog.V(4).Infof("Skip propagating %s", clusterWideKey.Group)
return true
klog.V(4).Infof("Skip event for %s", clusterWideKey.Group)
return false
}
if d.SkippedResourceConfig.GroupVersionDisabled(clusterWideKey.GroupVersion()) {
klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersion())
return true
klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersion())
return false
}
if d.SkippedResourceConfig.GroupVersionKindDisabled(clusterWideKey.GroupVersionKind()) {
klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersionKind())
return true
klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersionKind())
return false
}
}
// if SkippedPropagatingNamespaces is set, skip object events in these namespaces.
if _, ok := d.SkippedPropagatingNamespaces[clusterWideKey.Namespace]; ok {
klog.V(4).Infof("Skip propagating resources in %s", clusterWideKey.Namespace)
return true
return false
}

return false
return true
}

// OnAdd handles object add event and push the object to queue.
Expand Down Expand Up @@ -508,6 +527,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
}
return nil
})

if err != nil {
klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err)
return err
Expand Down Expand Up @@ -701,6 +721,14 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
return binding, nil
}

// isWaiting indicates if the object is in waiting list.
func (d *ResourceDetector) isWaiting(objectKey keys.ClusterWideKey) bool {
d.waitingLock.RLock()
_, ok := d.waitingObjects[objectKey]
d.waitingLock.RUnlock()
return ok
}

// AddWaiting adds object's key to waiting list.
func (d *ResourceDetector) AddWaiting(objectKey keys.ClusterWideKey) {
d.waitingLock.Lock()
Expand Down Expand Up @@ -744,6 +772,33 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour
return matchedResult
}

// OnPropagationPolicyAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Create PropagationPolicy(%s)", key)
d.policyReconcileWorker.Add(key)
}

// OnPropagationPolicyUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) {
// currently do nothing, since a policy's resource selector can not be updated.
}

// OnPropagationPolicyDelete handles object delete event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Delete PropagationPolicy(%s)", key)
d.policyReconcileWorker.Add(key)
}

// ReconcilePropagationPolicy handles PropagationPolicy resource changes.
// When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
// put the object to queue.
Expand Down Expand Up @@ -775,6 +830,33 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
return d.HandlePropagationPolicyCreation(propagationObject)
}

// OnClusterPropagationPolicyAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.Add(key)
}

// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) {
// currently do nothing, since a policy's resource selector can not be updated.
}

// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}

klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.Add(key)
}

// ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes.
// When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
// put the object to queue.
Expand Down