Skip to content

Commit

Permalink
[multikueue] Remove remote objects synchronously when reachable. (kub…
Browse files Browse the repository at this point in the history
…ernetes-sigs#2347)

* [multikueue] Remove remote objects synchronously when reachable.

* Extend unit test coverage.

* Review remarks
  • Loading branch information
trasc authored and Fiona-Waters committed Jun 25, 2024
1 parent 3268107 commit 23c4592
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 12 deletions.
55 changes: 49 additions & 6 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand All @@ -59,6 +60,7 @@ type wlReconciler struct {
clusters *clustersReconciler
origin string
workerLostTimeout time.Duration
deletedWlCache *utilmaps.SyncMap[string, *kueue.Workload]
}

var _ reconcile.Reconciler = (*wlReconciler)(nil)
Expand Down Expand Up @@ -156,13 +158,22 @@ func (w *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
log := ctrl.LoggerFrom(ctx)
log.V(2).Info("Reconcile Workload")
wl := &kueue.Workload{}
if err := w.client.Get(ctx, req.NamespacedName, wl); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
isDeleted := false
err := w.client.Get(ctx, req.NamespacedName, wl)
switch {
case client.IgnoreNotFound(err) != nil:
return reconcile.Result{}, err
case err != nil:
oldWl, found := w.deletedWlCache.Get(req.String())
if !found {
return reconcile.Result{}, nil
}
wl = oldWl
isDeleted = true

default:
isDeleted = !wl.DeletionTimestamp.IsZero()
}
// NOTE: the not found needs to be treated and should result in the deletion of all the remote workloads.
// since the list of remotes can only be taken from its list of admission check stats we need to either
// 1. use a finalizer
// 2. try to trigger the remote deletion from an event filter.

mkAc, err := w.multikueueAC(ctx, wl)
if err != nil {
Expand Down Expand Up @@ -200,6 +211,17 @@ func (w *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
return reconcile.Result{}, err
}

if isDeleted {
for cluster := range grp.remotes {
err := grp.RemoveRemoteObjects(ctx, cluster)
if err != nil {
return reconcile.Result{}, err
}
}
w.deletedWlCache.Delete(req.String())
return reconcile.Result{}, nil
}

return w.reconcileGroup(ctx, grp)
}

Expand Down Expand Up @@ -413,13 +435,33 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
return reconcile.Result{}, errors.Join(errs...)
}

func (w *wlReconciler) Create(_ event.CreateEvent) bool {
return true
}

func (w *wlReconciler) Delete(de event.DeleteEvent) bool {
if wl, isWl := de.Object.(*kueue.Workload); isWl && !de.DeleteStateUnknown {
w.deletedWlCache.Add(client.ObjectKeyFromObject(wl).String(), wl)
}
return true
}

func (w *wlReconciler) Update(_ event.UpdateEvent) bool {
return true
}

func (w *wlReconciler) Generic(_ event.GenericEvent) bool {
return true
}

func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, workerLostTimeout time.Duration) *wlReconciler {
return &wlReconciler{
client: c,
helper: helper,
clusters: cRec,
origin: origin,
workerLostTimeout: workerLostTimeout,
deletedWlCache: utilmaps.NewSyncMap[string, *kueue.Workload](0),
}
}

Expand All @@ -436,6 +478,7 @@ func (w *wlReconciler) setupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.Workload{}).
WatchesRawSource(&source.Channel{Source: w.clusters.wlUpdateCh}, syncHndl).
WithEventFilter(w).
Complete(w)
}

Expand Down
54 changes: 49 additions & 5 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
Expand Down Expand Up @@ -57,11 +58,12 @@ func TestWlReconcile(t *testing.T) {
baseJobBuilder := testingjob.MakeJob("job1", TestNamespace)

cases := map[string]struct {
reconcileFor string
managersWorkloads []kueue.Workload
managersJobs []batchv1.Job
worker1Workloads []kueue.Workload
worker1Jobs []batchv1.Job
reconcileFor string
managersWorkloads []kueue.Workload
managersJobs []batchv1.Job
managersDeletedWorkloads []*kueue.Workload
worker1Workloads []kueue.Workload
worker1Jobs []batchv1.Job
// second worker
useSecondWorker bool
worker2Reconnecting bool
Expand All @@ -84,6 +86,38 @@ func TestWlReconcile(t *testing.T) {
"missing workload": {
reconcileFor: "missing workload",
},
"missing workload (in deleted workload cache)": {
reconcileFor: "wl1",
managersDeletedWorkloads: []*kueue.Workload{
baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
worker1Jobs: []batchv1.Job{
*baseJobBuilder.Clone().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
},
"missing workload (in deleted workload cache), no remote objects": {
reconcileFor: "wl1",
managersDeletedWorkloads: []*kueue.Workload{
baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
},
"unmanaged wl (no ac) is ignored": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
Expand Down Expand Up @@ -709,6 +743,12 @@ func TestWlReconcile(t *testing.T) {
helper, _ := newMultiKueueStoreHelper(managerClient)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultWorkerLostTimeout)

for _, val := range tc.managersDeletedWorkloads {
reconciler.Delete(event.DeleteEvent{
Object: val,
})
}

_, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: tc.reconcileFor, Namespace: TestNamespace}})
if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("unexpected error (-want/+got):\n%s", diff)
Expand Down Expand Up @@ -769,6 +809,10 @@ func TestWlReconcile(t *testing.T) {
}
}
}

if l := reconciler.deletedWlCache.Len(); l > 0 {
t.Errorf("unexpected deletedWlCache len %d expecting 0", l)
}
})
}
}
38 changes: 38 additions & 0 deletions pkg/util/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package maps
import (
"fmt"
"maps"
"sync"

"k8s.io/apimachinery/pkg/util/sets"
)
Expand Down Expand Up @@ -119,3 +120,40 @@ func DeepCopySets[T comparable](src map[string]sets.Set[T]) map[string]sets.Set[
}
return copy
}

// SyncMap - generic RWMutex protected map.
type SyncMap[K comparable, V any] struct {
lock sync.RWMutex
m map[K]V
}

func NewSyncMap[K comparable, V any](size int) *SyncMap[K, V] {
return &SyncMap[K, V]{
m: make(map[K]V, size),
}
}

func (dwc *SyncMap[K, V]) Add(k K, v V) {
dwc.lock.Lock()
defer dwc.lock.Unlock()
dwc.m[k] = v
}

func (dwc *SyncMap[K, V]) Get(k K) (V, bool) {
dwc.lock.RLock()
defer dwc.lock.RUnlock()
v, found := dwc.m[k]
return v, found
}

func (dwc *SyncMap[K, V]) Len() int {
dwc.lock.RLock()
defer dwc.lock.RUnlock()
return len(dwc.m)
}

func (dwc *SyncMap[K, V]) Delete(k K) {
dwc.lock.Lock()
defer dwc.lock.Unlock()
delete(dwc.m, k)
}
Loading

0 comments on commit 23c4592

Please sign in to comment.