Skip to content

Commit

Permalink
[multikueue] Batch remote workload reconcile events. (#2380)
Browse files Browse the repository at this point in the history
* [multikueue] Batch remote workload reconcile events.

* Review remarks
  • Loading branch information
trasc authored Jun 11, 2024
1 parent 01bffcb commit 1ae95f3
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 6 deletions.
14 changes: 13 additions & 1 deletion pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/kueue/pkg/constants"
)

const (
Expand All @@ -32,6 +34,7 @@ type SetupOptions struct {
gcInterval time.Duration
origin string
workerLostTimeout time.Duration
eventsBatchPeriod time.Duration
}

type SetupOption func(o *SetupOptions)
Expand Down Expand Up @@ -60,11 +63,20 @@ func WithWorkerLostTimeout(d time.Duration) SetupOption {
}
}

// WithEventsBatchPeriod - sets the delay used when adding remote triggered
// events to the workload's reconcile queue.
func WithEventsBatchPeriod(d time.Duration) SetupOption {
return func(o *SetupOptions) {
o.eventsBatchPeriod = d
}
}

func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := &SetupOptions{
gcInterval: defaultGCInterval,
origin: defaultOrigin,
workerLostTimeout: defaultWorkerLostTimeout,
eventsBatchPeriod: constants.UpdatesBatchPeriod,
}

for _, o := range opts {
Expand Down Expand Up @@ -94,6 +106,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout)
wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, options.eventsBatchPeriod)
return wlRec.setupWithManager(mgr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package multikueue
import (
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -347,7 +348,7 @@ func TestWlReconcileJobset(t *testing.T) {
cRec.remoteClients["worker1"] = w1remoteClient

helper, _ := newMultiKueueStoreHelper(managerClient)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultWorkerLostTimeout)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultWorkerLostTimeout, time.Second)

for _, val := range tc.managersDeletedWorkloads {
reconciler.Delete(event.DeleteEvent{
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type wlReconciler struct {
origin string
workerLostTimeout time.Duration
deletedWlCache *utilmaps.SyncMap[string, *kueue.Workload]
eventsBatchPeriod time.Duration
}

var _ reconcile.Reconciler = (*wlReconciler)(nil)
Expand Down Expand Up @@ -458,24 +459,25 @@ func (w *wlReconciler) Generic(_ event.GenericEvent) bool {
return true
}

func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, workerLostTimeout time.Duration) *wlReconciler {
func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, workerLostTimeout, eventsBatchPeriod time.Duration) *wlReconciler {
return &wlReconciler{
client: c,
helper: helper,
clusters: cRec,
origin: origin,
workerLostTimeout: workerLostTimeout,
deletedWlCache: utilmaps.NewSyncMap[string, *kueue.Workload](0),
eventsBatchPeriod: eventsBatchPeriod,
}
}

func (w *wlReconciler) setupWithManager(mgr ctrl.Manager) error {
syncHndl := handler.Funcs{
GenericFunc: func(_ context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
q.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: e.Object.GetNamespace(),
Name: e.Object.GetName(),
}})
}}, w.eventsBatchPeriod)
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ func TestWlReconcile(t *testing.T) {
}

helper, _ := newMultiKueueStoreHelper(managerClient)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultWorkerLostTimeout)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultWorkerLostTimeout, time.Second)

for _, val := range tc.managersDeletedWorkloads {
reconciler.Delete(event.DeleteEvent{
Expand Down
1 change: 1 addition & 0 deletions test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) {
err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name,
multikueue.WithGCInterval(2*time.Second),
multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout),
multikueue.WithEventsBatchPeriod(100*time.Millisecond),
)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

0 comments on commit 1ae95f3

Please sign in to comment.