From 79cd7fc10886eee343ee8969961e547416703ae6 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 31 Jan 2024 15:26:36 +0100 Subject: [PATCH] Add visibility API integration tests --- pkg/visibility/api/rest/utils.go | 3 +- pkg/visibility/server.go | 28 +- test/integration/visibility/suite_test.go | 116 +++++ .../integration/visibility/visibility_test.go | 449 ++++++++++++++++++ 4 files changed, 585 insertions(+), 11 deletions(-) create mode 100644 test/integration/visibility/suite_test.go create mode 100644 test/integration/visibility/visibility_test.go diff --git a/pkg/visibility/api/rest/utils.go b/pkg/visibility/api/rest/utils.go index 81b73a7c10..784606cb4a 100644 --- a/pkg/visibility/api/rest/utils.go +++ b/pkg/visibility/api/rest/utils.go @@ -18,6 +18,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/util/priority" "sigs.k8s.io/kueue/pkg/workload" ) @@ -39,7 +40,7 @@ func newPendingWorkload(wlInfo *workload.Info, positionInLq int32, positionInCq CreationTimestamp: wlInfo.Obj.CreationTimestamp, }, PositionInClusterQueue: int32(positionInCq), - Priority: *wlInfo.Obj.Spec.Priority, + Priority: priority.Priority(wlInfo.Obj), LocalQueueName: wlInfo.Obj.Spec.QueueName, PositionInLocalQueue: positionInLq, } diff --git a/pkg/visibility/server.go b/pkg/visibility/server.go index 59b21b9003..a229ed41f8 100644 --- a/pkg/visibility/server.go +++ b/pkg/visibility/server.go @@ -20,17 +20,17 @@ import ( "net" "strings" - "sigs.k8s.io/kueue/apis/visibility/v1alpha1" - generatedopenapi "sigs.k8s.io/kueue/apis/visibility/v1alpha1/openapi" - "sigs.k8s.io/kueue/pkg/queue" - "sigs.k8s.io/kueue/pkg/visibility/api" - openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" "k8s.io/client-go/pkg/version" _ "k8s.io/component-base/metrics/prometheus/restclient" // for client-go metrics registration ctrl "sigs.k8s.io/controller-runtime" + + "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + generatedopenapi "sigs.k8s.io/kueue/apis/visibility/v1alpha1/openapi" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/visibility/api" ) var ( @@ -47,12 +47,11 @@ type server struct { // CreateAndStartVisibilityServer creates visibility server injecting KueueManager and starts it func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, ctx context.Context) { - config := newVisibilityServerConfig() - if err := applyVisibilityServerOptions(config); err != nil { - setupLog.Error(err, "Unable to apply VisibilityServerOptions") - } + CreateAndStartVisibilityServerForConfig(CreateVisibilityServerConfig(), kueueMgr, ctx) +} - visibilityServer, err := config.Complete().New("visibility-server", genericapiserver.NewEmptyDelegate()) +func CreateAndStartVisibilityServerForConfig(config genericapiserver.CompletedConfig, kueueMgr *queue.Manager, ctx context.Context) { + visibilityServer, err := config.New("visibility-server", genericapiserver.NewEmptyDelegate()) if err != nil { setupLog.Error(err, "Unable to create visibility server") } @@ -67,6 +66,15 @@ func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, ctx context.Context } } +func CreateVisibilityServerConfig() genericapiserver.CompletedConfig { + config := newVisibilityServerConfig() + if err := applyVisibilityServerOptions(config); err != nil { + setupLog.Error(err, "Unable to apply VisibilityServerOptions") + } + + return config.Complete() +} + func applyVisibilityServerOptions(config *genericapiserver.RecommendedConfig) error { o := genericoptions.NewRecommendedOptions("", api.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion)) o.Etcd = nil diff --git a/test/integration/visibility/suite_test.go b/test/integration/visibility/suite_test.go new file mode 100644 index 0000000000..c36588c198 --- /dev/null +++ b/test/integration/visibility/suite_test.go @@ -0,0 +1,116 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package visibility + +import ( + "context" + "path/filepath" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/scheduler" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" + kueueclientset "sigs.k8s.io/kueue/client-go/clientset/versioned" + visibilityv1alpha1 "sigs.k8s.io/kueue/client-go/clientset/versioned/typed/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/visibility" + "sigs.k8s.io/kueue/pkg/webhooks" + "sigs.k8s.io/kueue/test/integration/framework" + // +kubebuilder:scaffold:imports +) + +var ( + cfg *rest.Config + k8sClient client.Client + visibilityClient visibilityv1alpha1.VisibilityV1alpha1Interface + fwk *framework.Framework + ctx context.Context +) + +func TestAPIs(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, + "Visibility Suite", + ) +} + +var _ = ginkgo.BeforeSuite(func() { + fwk = &framework.Framework{ + CRDPath: filepath.Join("..", "..", "..", "config", "components", "crd", "bases"), + WebhookPath: filepath.Join("..", "..", "..", "config", "components", "webhook"), + } + + cfg = fwk.Init() + serverCfg := visibility.CreateVisibilityServerConfig() + + kueueClient, err := kueueclientset.NewForConfig(serverCfg.LoopbackClientConfig) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "unable to create kueue clientset") + + visibilityClient = kueueClient.VisibilityV1alpha1() + + ctx, k8sClient = fwk.RunManager(cfg, func(mgr manager.Manager, ctx context.Context) { + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + configuration.QueueVisibility = &config.QueueVisibility{ + UpdateIntervalSeconds: 2, + ClusterQueues: &config.ClusterQueueVisibility{ + MaxCount: 3, + }, + } + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) + err = sched.Start(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + go visibility.CreateAndStartVisibilityServerForConfig(serverCfg, queues, ctx) + + // wait for the extension API server to get started + gomega.Eventually(func() error { + result := visibilityClient.RESTClient().Get().Do(ctx) + if result.Error() != nil { + return err + } + return nil + }).Should(gomega.Succeed()) + }) +}) + +var _ = ginkgo.AfterSuite(func() { + fwk.Teardown() +}) diff --git a/test/integration/visibility/visibility_test.go b/test/integration/visibility/visibility_test.go new file mode 100644 index 0000000000..e3b75f0172 --- /dev/null +++ b/test/integration/visibility/visibility_test.go @@ -0,0 +1,449 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package visibility + +import ( + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/util/testing" + "sigs.k8s.io/kueue/test/util" +) + +var pendingWorkloadCmpOpts = []cmp.Option{ + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name"), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "CreationTimestamp"), + cmpopts.IgnoreTypes([]metav1.OwnerReference{}), +} + +var _ = Describe("Kueue visibility API", Ordered, func() { + const defaultFlavor = "default-flavor" + + var ( + resourceFlavor *kueue.ResourceFlavor + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ns *corev1.Namespace + ) + + BeforeAll(func() { + resourceFlavor = testing.MakeResourceFlavor(defaultFlavor).Obj() + Expect(k8sClient.Create(ctx, resourceFlavor)).To(Succeed()) + + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas(defaultFlavor). + Resource(corev1.ResourceCPU, "1"). + Obj(), + ). + Obj() + Expect(k8sClient.Create(ctx, clusterQueue)).To(Succeed()) + }) + + AfterAll(func() { + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, resourceFlavor, true) + }) + + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-", + }, + } + Expect(k8sClient.Create(ctx, ns)).To(Succeed()) + + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + Expect(k8sClient.Create(ctx, localQueue)).To(Succeed()) + }) + + AfterEach(func() { + Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(Succeed()) + }) + + When("There are no workloads in the queue", func() { + It("Should report no pending workloads", func() { + By("Calling the visibility API for the cluster queue", func() { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(summary.Items).To(HaveLen(0)) + }) + + By("Calling the visibility API for the local queue", func() { + summary, err := visibilityClient. + LocalQueues(ns.Name). + GetPendingWorkloadsSummary(ctx, localQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(summary.Items).To(HaveLen(0)) + }) + }) + }) + + When("There isn't any more quota available", func() { + var admittedWL *kueue.Workload + BeforeEach(func() { + admittedWL = testing.MakeWorkload("wl1", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + Obj() + Expect(k8sClient.Create(ctx, admittedWL)).To(Succeed()) + + Eventually(GetObject(admittedWL), util.Timeout, util.Interval). + Should(WithTransform(conditions, Not(BeEmpty()))) + }) + + It("Should report no pending workloads", func() { + By("Calling the visibility API for the cluster queue", func() { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(summary.Items).To(HaveLen(0)) + }) + + By("Calling the visibility API for the local queue", func() { + summary, err := visibilityClient. + LocalQueues(ns.Name). + GetPendingWorkloadsSummary(ctx, localQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(summary.Items).To(HaveLen(0)) + }) + }) + + It("Should report a pending workload", func() { + var wl *kueue.Workload + By("Creating a second workload", func() { + wl = testing.MakeWorkload("wl2", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + Obj() + Expect(k8sClient.Create(ctx, wl)).To(Succeed()) + + Eventually(GetObject(wl), util.Timeout, util.Interval). + Should(WithTransform(conditions, Not(BeEmpty()))) + }) + + By("Calling the visibility API for the cluster queue", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval). + Should(HaveExactElements( + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl.Name, + }, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...))) + }) + + By("Calling the visibility API for the local queue", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + LocalQueues(ns.Name). + GetPendingWorkloadsSummary(ctx, localQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval). + Should(HaveExactElements( + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl.Name, + }, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...))) + }) + }) + + It("Should not report any pending workloads after the pending workload is deleted", func() { + var wl *kueue.Workload + By("Creating a second workload", func() { + wl = testing.MakeWorkload("wl2", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + Obj() + Expect(k8sClient.Create(ctx, wl)).To(Succeed()) + + Eventually(GetObject(wl), util.Timeout, util.Interval). + Should(WithTransform(conditions, Not(BeEmpty()))) + }) + + By("Verifying the visibility API reports the pending workload", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval). + Should(HaveExactElements( + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl.Name, + }, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...))) + }) + + By("Deleting the second workload", func() { + Expect(k8sClient.Delete(ctx, wl)).To(Succeed()) + Eventually(ErrorFrom(GetObject(wl))). + Should(MatchError(errors.NewNotFound(kueue.Resource("workloads"), wl.Name))) + }) + + By("Calling the visibility API for the cluster queue", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval).Should(HaveLen(0)) + }) + + By("Calling the visibility API for the local queue", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + LocalQueues(ns.Name). + GetPendingWorkloadsSummary(ctx, localQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval).Should(HaveLen(0)) + }) + }) + + It("Should not report any pending workloads after the admitted workload is deleted", func() { + var wl *kueue.Workload + By("Creating a second workload", func() { + wl = testing.MakeWorkload("wl2", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + Obj() + Expect(k8sClient.Create(ctx, wl)).To(Succeed()) + + Eventually(GetObject(wl), util.Timeout, util.Interval). + Should(WithTransform(conditions, Not(BeEmpty()))) + }) + + By("Verifying the visibility API reports the pending workload", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval). + Should(HaveExactElements( + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl.Name, + }, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...))) + }) + + By("Deleting the admitted workload", func() { + Expect(k8sClient.Delete(ctx, admittedWL)).To(Succeed()) + Eventually(ErrorFrom(GetObject(admittedWL))). + Should(MatchError(errors.NewNotFound(kueue.Resource("workloads"), admittedWL.Name))) + }) + + By("Calling the visibility API for the cluster queue", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval).Should(HaveLen(0)) + }) + + By("Calling the visibility API for the local queue", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + LocalQueues(ns.Name). + GetPendingWorkloadsSummary(ctx, localQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval).Should(HaveLen(0)) + }) + }) + + It("Should report pending workloads according to their priorities", func() { + var ( + wl2 *kueue.Workload + wl3 *kueue.Workload + wl4 *kueue.Workload + ) + By("Creating three workloads with different workload priorities", func() { + wl2 = testing.MakeWorkload("wl2", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + Priority(1). + Obj() + Expect(k8sClient.Create(ctx, wl2)).To(Succeed()) + + Eventually(GetObject(wl2), util.Timeout, util.Interval). + Should(WithTransform(conditions, Not(BeEmpty()))) + + wl3 = testing.MakeWorkload("wl3", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + Priority(2). + Obj() + Expect(k8sClient.Create(ctx, wl3)).To(Succeed()) + + Eventually(GetObject(wl3), util.Timeout, util.Interval). + Should(WithTransform(conditions, Not(BeEmpty()))) + + wl4 = testing.MakeWorkload("wl4", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + Priority(3). + Obj() + Expect(k8sClient.Create(ctx, wl4)).To(Succeed()) + + Eventually(GetObject(wl4), util.Timeout, util.Interval). + Should(WithTransform(conditions, Not(BeEmpty()))) + }) + + By("Verifying the visibility API reports the correct positions and priorities", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval). + Should(HaveExactElements( + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl4.Name, + }, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + Priority: 3, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...), + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl3.Name, + }, + PositionInLocalQueue: 1, + PositionInClusterQueue: 1, + Priority: 2, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...), + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl2.Name, + }, + PositionInLocalQueue: 2, + PositionInClusterQueue: 2, + Priority: 1, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...), + )) + }) + + By("Deleting a pending workload", func() { + Expect(k8sClient.Delete(ctx, wl3)).To(Succeed()) + Eventually(ErrorFrom(GetObject(wl3))). + Should(MatchError(errors.NewNotFound(kueue.Resource("workloads"), wl3.Name))) + }) + + By("Verifying the visibility API reports the correct positions and priorities", func() { + Eventually(func() []visibility.PendingWorkload { + summary, err := visibilityClient. + ClusterQueues(). + GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return summary.Items + }, util.Timeout, util.Interval). + Should(HaveExactElements( + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl4.Name, + }, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + Priority: 3, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...), + BeComparableTo(visibility.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: wl2.Name, + }, + PositionInLocalQueue: 1, + PositionInClusterQueue: 1, + Priority: 1, + LocalQueueName: localQueue.Name, + }, pendingWorkloadCmpOpts...), + )) + }) + }) + }) +}) + +func GetObject(obj client.Object) func(g Gomega) (client.Object, error) { + return func(g Gomega) (client.Object, error) { + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(obj), obj) + return obj, err + } +} + +func ErrorFrom[T any](fn func(g Gomega) (T, error)) func(g Gomega) error { + return func(g Gomega) error { + _, err := fn(g) + return err + } +} + +func conditions(wl *kueue.Workload) []metav1.Condition { + return wl.Status.Conditions +}