Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HA support for the visibility API #1554

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/controller/core/admissioncheck_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/queue"
Expand Down Expand Up @@ -214,13 +217,14 @@ func (h *acCqHandler) Generic(ctx context.Context, e event.GenericEvent, q workq
}

// SetupWithManager sets up the controller with the Manager.
func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error {
handler := acCqHandler{
cache: r.cache,
}
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.AdmissionCheck{}).
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler).
WithEventFilter(r).
Complete(r)
Complete(WithLeadingManager(mgr, r, &kueue.AdmissionCheck{}, cfg))
}
8 changes: 6 additions & 2 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
Expand Down Expand Up @@ -594,7 +597,7 @@ func (h *cqSnapshotHandler) Generic(_ context.Context, e event.GenericEvent, q w
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error {
wHandler := cqWorkloadHandler{
qManager: r.qManager,
}
Expand All @@ -613,13 +616,14 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.ClusterQueue{}).
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
Watches(&corev1.Namespace{}, &nsHandler).
WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &wHandler).
WatchesRawSource(&source.Channel{Source: r.rfUpdateCh}, &rfHandler).
WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler).
WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler).
WithEventFilter(r).
Complete(r)
Complete(WithLeadingManager(mgr, r, &kueue.ClusterQueue{}, cfg))
}

func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ const updateChBuffer = 10
// controller that failed to create and an error, if any.
func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *config.Configuration) (string, error) {
rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc)
if err := rfRec.SetupWithManager(mgr); err != nil {
if err := rfRec.SetupWithManager(mgr, cfg); err != nil {
return "ResourceFlavor", err
}
acRec := NewAdmissionCheckReconciler(mgr.GetClient(), qManager, cc)
if err := acRec.SetupWithManager(mgr); err != nil {
if err := acRec.SetupWithManager(mgr, cfg); err != nil {
return "AdmissionCheck", err
}
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc)
if err := qRec.SetupWithManager(mgr); err != nil {
if err := qRec.SetupWithManager(mgr, cfg); err != nil {
return "LocalQueue", err
}

Expand All @@ -59,13 +59,13 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
}
rfRec.AddUpdateWatcher(cqRec)
acRec.AddUpdateWatchers(cqRec)
if err := cqRec.SetupWithManager(mgr); err != nil {
if err := cqRec.SetupWithManager(mgr, cfg); err != nil {
return "ClusterQueue", err
}
if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc,
mgr.GetEventRecorderFor(constants.WorkloadControllerName),
WithWorkloadUpdateWatchers(qRec, cqRec),
WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr); err != nil {
WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr, cfg); err != nil {
return "Workload", err
}
return "", nil
Expand Down
100 changes: 100 additions & 0 deletions pkg/controller/core/leader_aware_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package core

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
)

// defaultRequeueDuration defaults the duration used by non-leading replicas
// to requeue events, so no events are missed over the period it takes for
// leader election to fail over a new replica.
const defaultRequeueDuration = 15 * time.Second

// WithLeadingManager returns a decorating reconcile.Reconciler that discards reconciliation requests
// for the controllers that are started with the controller.Options.NeedLeaderElection
// option set to false in non-leading replicas.
//
// Starting controllers in non-leading replicas is needed for these that update the data
// served by the visibility extension API server.
//
// This enables to:
// - Keep the scheduling decisions under the responsibility of the leading replica alone,
// to prevent any concurrency issues.
// - Consume requests from the watch event queues, to prevent them from growing indefinitely
// in the non-leading replicas.
// - Transition to actually reconciling requests in the replica that may acquire
// the leader election lease, in case the previously leading replica failed to renew it.
func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, obj client.Object, cfg *config.Configuration) reconcile.Reconciler {
// Do not decorate the reconciler if leader election is disabled
if cfg.LeaderElection == nil || !ptr.Deref(cfg.LeaderElection.LeaderElect, false) {
return reconciler
}

// Default to the recommended lease duration, that's used for core components
requeueDuration := defaultRequeueDuration
// Otherwise used the configured lease duration for the manager
zero := metav1.Duration{}
if duration := cfg.LeaderElection.LeaseDuration; duration != zero {
requeueDuration = duration.Duration
}

return &leaderAwareReconciler{
elected: mgr.Elected(),
client: mgr.GetClient(),
delegate: reconciler,
object: obj,
requeueDuration: requeueDuration,
}
}

type leaderAwareReconciler struct {
elected <-chan struct{}
client client.Client
delegate reconcile.Reconciler
object client.Object
requeueDuration time.Duration
}

var _ reconcile.Reconciler = (*leaderAwareReconciler)(nil)

func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
astefanutti marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-r.elected:
// The manager has been elected leader, delegate reconciliation to the provided reconciler.
return r.delegate.Reconcile(ctx, request)
default:
if err := r.client.Get(ctx, request.NamespacedName, r.object); err != nil {
// Discard request if not found, to prevent from re-enqueueing indefinitely.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// The manager hasn't been elected leader yet, requeue the reconciliation request
// to prevent against any missed / discarded events over the period it takes
// to fail over a new leading replica, which can take as much as the configured
// lease duration, for it to acquire leadership.
return ctrl.Result{RequeueAfter: r.requeueDuration}, nil
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
}
}
8 changes: 6 additions & 2 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
Expand Down Expand Up @@ -248,16 +251,17 @@ func (h *qCQHandler) addLocalQueueToWorkQueue(ctx context.Context, cq *kueue.Clu
}

// SetupWithManager sets up the controller with the Manager.
func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error {
queueCQHandler := qCQHandler{
client: r.client,
}
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.LocalQueue{}).
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}).
Watches(&kueue.ClusterQueue{}, &queueCQHandler).
WithEventFilter(r).
Complete(r)
Complete(WithLeadingManager(mgr, r, &kueue.LocalQueue{}, cfg))
}

func (r *LocalQueueReconciler) UpdateStatusIfChanged(
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/core/resourceflavor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/queue"
Expand Down Expand Up @@ -249,15 +252,16 @@ func (h *cqHandler) Generic(_ context.Context, e event.GenericEvent, q workqueue
}

// SetupWithManager sets up the controller with the Manager.
func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error {
handler := cqHandler{
cache: r.cache,
}
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.ResourceFlavor{}).
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler).
WithEventFilter(r).
Complete(r)
Complete(WithLeadingManager(mgr, r, &kueue.ResourceFlavor{}, cfg))
}

func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] {
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import (
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
Expand Down Expand Up @@ -489,17 +491,18 @@ func (r *WorkloadReconciler) notifyWatchers(oldWl, newWl *kueue.Workload) {
}

// SetupWithManager sets up the controller with the Manager.
func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error {
ruh := &resourceUpdatesHandler{
r: r,
}
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.Workload{}).
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
Watches(&corev1.LimitRange{}, ruh).
Watches(&nodev1.RuntimeClass{}, ruh).
Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}).
WithEventFilter(r).
Complete(r)
Complete(WithLeadingManager(mgr, r, &kueue.Workload{}, cfg))
}

// admittedNotReadyWorkload returns as a pair of values. The first boolean determines
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/config/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ kind: Kustomization
resources:
- ../../../config/default

replicas:
- name: kueue-controller-manager
count: 2

patches:
- path: manager_e2e_patch.yaml
target:
Expand Down