Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

automatically inject default queue if not provided #598

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/controllers/raycluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand All @@ -45,6 +46,7 @@ var rayclusterlog = logf.Log.WithName("raycluster-resource")

func SetupRayClusterWebhookWithManager(mgr ctrl.Manager, cfg *config.KubeRayConfiguration) error {
rayClusterWebhookInstance := &rayClusterWebhook{
Client: mgr.GetClient(),
Config: cfg,
}
return ctrl.NewWebhookManagedBy(mgr).
Expand All @@ -58,6 +60,7 @@ func SetupRayClusterWebhookWithManager(mgr ctrl.Manager, cfg *config.KubeRayConf
// +kubebuilder:webhook:path=/validate-ray-io-v1-raycluster,mutating=false,failurePolicy=fail,sideEffects=None,groups=ray.io,resources=rayclusters,verbs=create;update,versions=v1,name=vraycluster.ray.openshift.ai,admissionReviewVersions=v1

type rayClusterWebhook struct {
client.Client
Config *config.KubeRayConfiguration
}

Expand All @@ -77,6 +80,12 @@ func (w *rayClusterWebhook) Default(ctx context.Context, obj runtime.Object) err
rayCluster.Spec.HeadGroupSpec.Template.Spec.ServiceAccountName = rayCluster.Name + "-oauth-proxy"
}

// add default queue label if not present
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the RayCluster is owned by a Kueue-managed object (eg an AppWrapepr) then injecting the queue-name label on a RayCluster that does not have one is not correct. This functionality at least needs to be guarded by configuration on the operator that allows it to be disabled. To actually be robust it should be looking to see if the RayCluster is not owned by an AppWrapper.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might actually warrant a webhook on the Workload CRD and checking if spec.queue is set. I don't understand why this approach is incorrect however, using a mutating webhook to inject defaults is a normal use case for mutating webhooks and shouldn't cause any issues since the value is changed prior to being applied to etcd meaning it shouldn't cause issues with re-reconciliation loops. Does this have something to do with AppWrapper functionality that I'm missing?

Copy link
Collaborator

@dgrove-oss dgrove-oss Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment comes from Kueue's logic for supporting parent/child relationships for workloads. It has code in its defaulting webhooks to detect that the object being created is a child of a Kueue managed parent object (encoded by a controller reference) and to not set the default value for suspend in a child job.
https://github.com/kubernetes-sigs/kueue/blob/release-0.8/pkg/controller/jobframework/defaults.go#L23-L34

I'd suggest adding a similar check here to avoid adding a label which is going to have no impact at runtime and might be confusing to people. If the ray cluster is controlled by an appwrapper, then it's the queue-name that is on the appwrapper that matters. The queue-name (or lack of one) on the ray cluster does nothing.

You can't easily call IsOwnerManagedByKueue(owner), but you could either add a check for the appwrapper GVK or just check for whether the ray cluster is controlled by anything at all (I think for standalone clusters, this should be no).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @dgrove-oss isn't questioning the use of the webhook, but raising the point the logic should only apply to the top-level resource managed by Kueue in the ownership graph.

Copy link
Contributor

@astefanutti astefanutti Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the functionality that this PR adds for RayClusters could be generalised and proposed upstream?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An upstream generalization would be nice. For AppWrappers, we went with adding an optional defaultQueueName to the AppWrapper operator configuration. It's less customizable (uses a single default queue name for all namespaces), but it covered the use case we had for MLBatch. If an AppWrapper doesn't have a queue-name and the operator is configured with a default then the webhook injects it. But it only works for AppWrappers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I see thanks @dgrove-oss

err := withDefaultLocalQueue(ctx, rayCluster, w.Client)
if err != nil {
return err
}

if ptr.Deref(w.Config.MTLSEnabled, true) {
rayclusterlog.V(2).Info("Adding create-cert Init Containers")
// HeadGroupSpec
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/raycluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/project-codeflare/codeflare-operator/pkg/config"
)
Expand All @@ -35,6 +36,7 @@ var (
rayClusterName = "test-raycluster"

rcWebhook = &rayClusterWebhook{
Client: fake.NewFakeClient(),
Config: &config.KubeRayConfiguration{},
}
)
Expand Down
73 changes: 71 additions & 2 deletions pkg/controllers/support.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
package controllers

import (
"context"
"os"
"strconv"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"

corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation/field"
v1 "k8s.io/client-go/applyconfigurations/meta/v1"
networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
kueuev1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1"

routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1"
)

var (
CertGeneratorImage = getEnv("CERT_GENERATOR_IMAGE", "registry.redhat.io/ubi9@sha256:770cf07083e1c85ae69c25181a205b7cdef63c11b794c89b3b487d4670b4c328")
OAuthProxyImage = getEnv("OAUTH_PROXY_IMAGE", "registry.redhat.io/openshift4/ose-oauth-proxy@sha256:1ea6a01bf3e63cdcf125c6064cbd4a4a270deaf0f157b3eabb78f60556840366")
CertGeneratorImage = getEnv("CERT_GENERATOR_IMAGE", "registry.redhat.io/ubi9@sha256:770cf07083e1c85ae69c25181a205b7cdef63c11b794c89b3b487d4670b4c328")
OAuthProxyImage = getEnv("OAUTH_PROXY_IMAGE", "registry.redhat.io/openshift4/ose-oauth-proxy@sha256:1ea6a01bf3e63cdcf125c6064cbd4a4a270deaf0f157b3eabb78f60556840366")
DefaultLocalQueueLabel = "kueue.x-k8s.io/default-queue"
LocalQueueLabel = "kueue.x-k8s.io/queue-name"
)

type HasMetadata interface {
GetObjectMeta() *metav1.ObjectMeta
}

func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
Expand Down Expand Up @@ -172,3 +185,59 @@ func withEnvVarName(name string) compare[corev1.EnvVar] {
return e1.Name == name
}
}

func withDefaultLocalQueue(ctx context.Context, k8Obj interface{}, c client.Client) error {
logger := ctrl.LoggerFrom(ctx)
metaAccessor, err := meta.Accessor(k8Obj)
if err != nil {
logger.Error(err, "Object does not have required metadata")
return err
}
// k8Meta := k8Obj.GetObjectMeta()
labels := metaAccessor.GetLabels()
if labels == nil {
labels = make(map[string]string)
}

_, ok := labels[LocalQueueLabel]
if ok {
return nil
}

// for accessing default Kind and Version
emptyLocalQueue := kueuev1beta1.LocalQueue{}

localQueueMetaList := metav1.PartialObjectMetadataList{
TypeMeta: metav1.TypeMeta{
APIVersion: emptyLocalQueue.APIVersion,
Kind: emptyLocalQueue.Kind,
},
}
err = c.List(ctx, &localQueueMetaList)

if client.IgnoreNotFound(err) != nil {
logger.Error(err, "Failed to list LocalQueues")
return err
} else if err != nil {
logger.Info("LocalQueue CRD not found")
return nil
}

for _, localQMeta := range localQueueMetaList.Items {
isDefault, ok := localQMeta.Labels[DefaultLocalQueueLabel]
if !ok {
continue
}
boolIsDefault, err := strconv.ParseBool(isDefault)
if err != nil {
logger.Error(err, "Failed to parse bool")
continue
}
if boolIsDefault {
labels[LocalQueueLabel] = localQMeta.Name
break
}
}
metaAccessor.SetLabels(labels)
return nil
}
Loading