Skip to content

Commit

Permalink
feat: add, remove finalizer for CRB in Scheduler and scheduler watche…
Browse files Browse the repository at this point in the history
…r for CRB (#924)
  • Loading branch information
Arvindthiru authored Nov 5, 2024
1 parent cb9a7a0 commit 848ada9
Show file tree
Hide file tree
Showing 12 changed files with 652 additions and 44 deletions.
6 changes: 6 additions & 0 deletions apis/placement/v1beta1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// SchedulerCRBCleanupFinalizer is a finalizer added to ClusterResourceBindings to ensure we can look up the
// corresponding CRP name for deleting ClusterResourceBindings to trigger a new scheduling cycle.
SchedulerCRBCleanupFinalizer = fleetPrefix + "scheduler-crb-cleanup"
)

// +kubebuilder:object:root=true
// +kubebuilder:resource:scope=Cluster,categories={fleet,fleet-placement},shortName=rb
// +kubebuilder:subresource:status
Expand Down
10 changes: 10 additions & 0 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.goms.io/fleet/pkg/scheduler/framework"
"go.goms.io/fleet/pkg/scheduler/profile"
"go.goms.io/fleet/pkg/scheduler/queue"
schedulercrbwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourcebinding"
schedulercrpwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourceplacement"
schedulercspswatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterschedulingpolicysnapshot"
"go.goms.io/fleet/pkg/scheduler/watchers/membercluster"
Expand Down Expand Up @@ -278,6 +279,15 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
return err
}

klog.Info("Setting up the clusterResourceBinding watcher for scheduler")
if err := (&schedulercrbwatcher.Reconciler{
Client: mgr.GetClient(),
SchedulerWorkQueue: defaultSchedulingQueue,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterResourceBinding watcher for scheduler")
return err
}

klog.Info("Setting up the memberCluster watcher for scheduler")
if err := (&membercluster.Reconciler{
Client: mgr.GetClient(),
Expand Down
73 changes: 50 additions & 23 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
Expand Down Expand Up @@ -287,15 +288,21 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
// * dangling bindings, i.e., bindings that are associated with a cluster that is no longer
// in a normally operating state (the cluster has left the fleet, or is in the state of leaving),
// yet has not been marked as unscheduled by the scheduler; and
//
// * deleting bindings, i.e., bindings that have a deletionTimeStamp on them.
// Any deleted binding is also ignored.
// Note that bindings marked as unscheduled are ignored by the scheduler, as they
// are irrelevant to the scheduling cycle. However, we will reconcile them with the latest scheduling
// result so that we won't have a ever increasing chain of flip flop bindings.
bound, scheduled, obsolete, unscheduled, dangling := classifyBindings(policy, bindings, clusters)
bound, scheduled, obsolete, unscheduled, dangling, deleting := classifyBindings(policy, bindings, clusters)

// Remove scheduler CRB cleanup finalizer on all deleting bindings.
if err := f.updateBindings(ctx, deleting, removeFinalizerAndUpdate); err != nil {
klog.ErrorS(err, "Failed to remove finalizers from deleting bindings", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}

// Mark all dangling bindings as unscheduled.
if err := f.markAsUnscheduledFor(ctx, dangling); err != nil {
if err := f.updateBindings(ctx, dangling, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark dangling bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -349,31 +356,51 @@ func (f *framework) collectBindings(ctx context.Context, crpName string) ([]plac
return bindingList.Items, nil
}

// markAsUnscheduledFor marks a list of bindings as unscheduled.
func (f *framework) markAsUnscheduledFor(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding) error {
// markAsUnscheduledForAndUpdate marks a binding as unscheduled and updates it.
var markUnscheduledForAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
// Remember the previous unscheduledBinding state so that we might be able to revert this change if this
// cluster is being selected again before the resources are removed from it. Need to do a get and set if
// we add more annotations to the binding.
binding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(binding.Spec.State)})
// Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a
// unscheduledBinding from "scheduled" to "bound".
binding.Spec.State = placementv1beta1.BindingStateUnscheduled
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
if err == nil {
klog.V(2).InfoS("Marked binding as unscheduled", "clusterResourceBinding", klog.KObj(binding))
}
return err
}

// removeFinalizerAndUpdate removes scheduler CRB cleanup finalizer from ClusterResourceBinding and updates it.
var removeFinalizerAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
controllerutil.RemoveFinalizer(binding, placementv1beta1.SchedulerCRBCleanupFinalizer)
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
if err == nil {
klog.V(2).InfoS("Removed scheduler CRB cleanup finalizer", "clusterResourceBinding", klog.KObj(binding))
}
return err
}

// updateBindings iterates over bindings and updates them using the update function provided.
func (f *framework) updateBindings(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding, updateFn func(ctx context.Context, client client.Client, binding *placementv1beta1.ClusterResourceBinding) error) error {
// issue all the update requests in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, binding := range bindings {
unscheduledBinding := binding
updateBinding := binding
errs.Go(func() error {
return retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsConflict(err)
},
func() error {
// Remember the previous unscheduledBinding state so that we might be able to revert this change if this
// cluster is being selected again before the resources are removed from it. Need to do a get and set if
// we add more annotations to the binding.
unscheduledBinding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(unscheduledBinding.Spec.State)})
// Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a
// unscheduledBinding from "scheduled" to "bound".
unscheduledBinding.Spec.State = placementv1beta1.BindingStateUnscheduled
err := f.client.Update(cctx, unscheduledBinding, &client.UpdateOptions{})
klog.V(2).InfoS("Marking binding as unscheduled", "clusterResourceBinding", klog.KObj(unscheduledBinding), "error", err)
// We will just retry for conflict errors since the scheduler holds the truth here.
err := updateFn(cctx, f.client, updateBinding)
// We will retry on conflicts.
if apierrors.IsConflict(err) {
// get the binding again to make sure we have the latest version to update again.
return f.client.Get(cctx, client.ObjectKeyFromObject(unscheduledBinding), unscheduledBinding)
if getErr := f.client.Get(cctx, client.ObjectKeyFromObject(updateBinding), updateBinding); getErr != nil {
return getErr
}
}
return err
})
Expand Down Expand Up @@ -656,7 +683,7 @@ func (f *framework) manipulateBindings(
//
// This is set to happen after new bindings are created and old bindings are updated, to
// avoid interruptions (deselected then reselected) in a best effort manner.
if err := f.markAsUnscheduledFor(ctx, toDelete); err != nil {
if err := f.updateBindings(ctx, toDelete, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark bindings as unschedulable", "clusterSchedulingPolicySnapshot", policyRef)
return err
}
Expand Down Expand Up @@ -809,7 +836,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(
klog.V(2).InfoS("Downscaling is needed", "clusterSchedulingPolicySnapshot", policyRef, "downscaleCount", downscaleCount)

// Mark all obsolete bindings as unscheduled first.
if err := f.markAsUnscheduledFor(ctx, obsolete); err != nil {
if err := f.updateBindings(ctx, obsolete, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark obsolete bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -986,10 +1013,10 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement
bindingsToDelete = append(bindingsToDelete, sortedScheduled[i])
}

return sortedScheduled[count:], bound, f.markAsUnscheduledFor(ctx, bindingsToDelete)
return sortedScheduled[count:], bound, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
case count == len(scheduled):
// Trim all scheduled bindings.
return nil, bound, f.markAsUnscheduledFor(ctx, scheduled)
return nil, bound, f.updateBindings(ctx, scheduled, markUnscheduledForAndUpdate)
case count < len(scheduled)+len(bound):
// Trim all scheduled bindings and part of bound bindings.
bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count)
Expand All @@ -1010,13 +1037,13 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement
bindingsToDelete = append(bindingsToDelete, sortedBound[i])
}

return nil, sortedBound[left:], f.markAsUnscheduledFor(ctx, bindingsToDelete)
return nil, sortedBound[left:], f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
case count == len(scheduled)+len(bound):
// Trim all scheduled and bound bindings.
bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count)
bindingsToDelete = append(bindingsToDelete, scheduled...)
bindingsToDelete = append(bindingsToDelete, bound...)
return nil, nil, f.markAsUnscheduledFor(ctx, bindingsToDelete)
return nil, nil, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
default:
// Normally this branch will never run, as an earlier check has guaranteed that
// count <= len(scheduled) + len(bound).
Expand Down
Loading

0 comments on commit 848ada9

Please sign in to comment.