diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index 918bb967ce..f21f2dd79d 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -301,6 +301,17 @@ type Integrations struct { Frameworks []string `json:"frameworks,omitempty"` // PodOptions defines kueue controller behaviour for pod objects PodOptions *PodIntegrationOptions `json:"podOptions,omitempty"` + + // labelKeysToCopy is a list of label keys that should be copied from the job into the + // workload object. It is not required for the job to have all the labels from this + // list. If a job does not have some label with the given key from this list, the + // constructed workload object will be created without this label. In the case + // of creating a workload from a composable job (pod group), if multiple objects + // have labels with some key from the list, the values of these labels must + // match or otherwise the workload creation would fail. The labels are copied only + // during the workload creation and are not updated even if the labels of the + // underlying job are changed. + LabelKeysToCopy []string `json:"labelKeysToCopy,omitempty"` } type PodIntegrationOptions struct { diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index 000f346d73..f53a7f8fa7 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -245,6 +245,11 @@ func (in *Integrations) DeepCopyInto(out *Integrations) { *out = new(PodIntegrationOptions) (*in).DeepCopyInto(*out) } + if in.LabelKeysToCopy != nil { + in, out := &in.LabelKeysToCopy, &out.LabelKeysToCopy + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Integrations. diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go index 10b0c3e950..5f39138d3d 100644 --- a/cmd/importer/pod/import.go +++ b/cmd/importer/pod/import.go @@ -68,7 +68,7 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs kp := pod.FromObject(p) // Note: the recorder is not used for single pods, we can just pass nil for now. - wl, err := kp.ConstructComposableWorkload(ctx, c, nil) + wl, err := kp.ConstructComposableWorkload(ctx, c, nil, nil) if err != nil { return false, fmt.Errorf("construct workload: %w", err) } diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 2fc7905b29..4b0bd26bdd 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -269,6 +269,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag jobframework.WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), cfg.Integrations.PodOptions), jobframework.WithEnabledFrameworks(cfg.Integrations), jobframework.WithManagerName(constants.KueueName), + jobframework.WithLabelKeysToCopy(cfg.Integrations.LabelKeysToCopy), } if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil { setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion()) diff --git a/keps/1834-copy-labels-into-workload/README.md b/keps/1834-copy-labels-into-workload/README.md new file mode 100644 index 0000000000..2a2294bad0 --- /dev/null +++ b/keps/1834-copy-labels-into-workload/README.md @@ -0,0 +1,121 @@ +# KEP-1834: Copy some labels from Pods/Jobs into the Workload object + + +- [Summary](#summary) +- [Motivation](#motivation) + - [Goals](#goals) + - [Non-Goals](#non-goals) +- [Proposal](#proposal) + - [Risks and Mitigations](#risks-and-mitigations) +- [Design Details](#design-details) + - [Test Plan](#test-plan) + - [Prerequisite testing updates](#prerequisite-testing-updates) + - [Unit Tests](#unit-tests) + - [Integration tests](#integration-tests) + - [Graduation Criteria](#graduation-criteria) +- [Implementation History](#implementation-history) +- [Drawbacks](#drawbacks) +- [Alternatives](#alternatives) + + +## Summary + +When creating a kueue workload, copy the job (or pod) labels into the workload object (see https://github.com/kubernetes-sigs/kueue/issues/1834). Allow to configure which labels should be copied. + +## Motivation + +Currently the workloads do not "inherit" any of the labels of the jobs based on which they are created. Since workloads in Kueue are internal representations of the Kubernetes jobs, allowing to easily copy the labels of the job object into the workload object is natural and useful. For instance it would facilitate selection/filtering of the workloads by the administrators. + +### Goals + +* Establish a mechanism to configure which set of labels should be copied. +* Copy the selected labels when creating the workload based on a job. + +### Non-Goals +* This proposal does not contain any form of validation or analysis of the labels as they are going to be copied from one object to another. +* This proposal only concerns copying of labels into newly created workloads. Updating the labels of an existing workload (for example if the label of the underlying job is changed) is out of scope of this proposal. + +## Proposal + +We want to do the following API change. The proposal is to add a field named `labelKeysToCopy` into the configuration API (under `Integrations`). This field will hold a list of keys of labels that should be copied. This configuration will be global in the sense that it will apply to all the job frameworks. Since the list `labelKeysToCopy` will be empty by default, this change will not affect the existing functionality. + +We will not require for all the labels with keys from `labelKeysToCopy` to be present at the job object. When some of the labels will not be present at the job object, this label will not be assigned to the created workload object. + +A case that requires more attention is creating workloads from pod groups because in that case a single workload is based on multiple pods (each of which might have labels). We propose: + * When a label from the `labelKeysToCopy` list will be present at some of the pods from the group and the value of this label on all these pods will be identical then this label will be copied into the workload. Note that we do not require that all the pods have the label but all that do have must have the same value. + * When multiple pods from the group will have the label with different values, we will raise an exception. The exception will be raised during the workload creation. + + +### Risks and Mitigations + +None. + +## Design Details + + +The proposal contains a single modification to the API. We propose to add a new field `LabelKeysToCopy` to `Integrations` struct in the `Configuration` API. +``` go +type Integrations struct { + ... + // labelKeysToCopy is a list of label keys that should be copied from the job into the + // workload object. It is not required for the job to have all the labels from this + // list. If a job does not have some label with the given key from this list, the + // constructed workload object will be created without this label. In the case + // of creating a workload from a composable job (pod group), if multiple objects + // have labels with some key from the list, the values of these labels must + // match or otherwise the workload creation would fail. The labels are copied only + // during the workload creation and are not updated even if the labels of the + // underlying job are changed. + LabelKeysToCopy []string `json:"labelKeysToCopy,omitempty"` + ... +} +``` +### Test Plan + + + +[x] I/we understand the owners of the involved components may require updates to +existing tests to make this code solid enough prior to committing the changes necessary +to implement this enhancement. + +##### Prerequisite testing updates + + + +#### Unit Tests + +New unit tests should be added testing the functionality for jobs and pods. + +#### Integration tests + +The idea is to enhance the existing integrations tests to check if workload objects are created with correct labels. + +### Graduation Criteria + +## Implementation History + +* 2024-04-08 First draft + +## Drawbacks + +With this KEP some workload objects that previously succeeded could fail to be created if such workload is based on a pod group with mismatched labels (i.e., pods in the same pod group having different label values for some label key that should be copied). This will only happen if a user explicitly configures this label key to be copied. + +This proposal introduces the label copying only during the workload creation. If a user modifies labels on a running job, the modification will not be reflected on the workload object, which might be confusing. + +## Alternatives + +An alternative way of handling mismatched labels would be to copy an arbitrary value among the values on the individual pods. The downside of this approach is that such a case is most likely due to an error and should not be silently accepted. + +Another alternative could be to copy all the labels by default and have a list of excluded keys. \ No newline at end of file diff --git a/keps/1834-copy-labels-into-workload/kep.yaml b/keps/1834-copy-labels-into-workload/kep.yaml new file mode 100644 index 0000000000..8110297ac8 --- /dev/null +++ b/keps/1834-copy-labels-into-workload/kep.yaml @@ -0,0 +1,25 @@ +title: Copy labels into workload +kep-number: 1834 +authors: + - "@pajakd" +status: implementable +creation-date: 2024-04-08 +reviewers: + - "@alculquicondor" + - "@mimowo" +approvers: + - "@alculquicondor" + - "@tenzen-y" + +# The target maturity stage in the current dev cycle for this KEP. +stage: beta + +# The most recent milestone for which work toward delivery of this KEP has been +# done. This can be the current (upcoming) milestone, if it is being actively +# worked on. +latest-milestone: "v0.7" + +# The milestone at which this feature was, or is targeted to be, at each stage. +milestone: + alpha: "v0.7" + beta: "v0.7" diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index 45535c3b7e..5bf6d77b07 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -104,7 +104,7 @@ type ComposableJob interface { // counts extracting from workload to all members of the ComposableJob. Run(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, r record.EventRecorder, msg string) error // ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob. - ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder) (*kueue.Workload, error) + ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder, labelKeysToCopy []string) (*kueue.Workload, error) // ListChildWorkloads returns all workloads related to the composable job ListChildWorkloads(ctx context.Context, c client.Client, parent types.NamespacedName) (*kueue.WorkloadList, error) // FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted. diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 36159f21cc..833f34dc1a 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -66,6 +66,7 @@ type JobReconciler struct { record record.EventRecorder manageJobsWithoutQueueName bool waitForPodsReady bool + labelKeysToCopy []string } type Options struct { @@ -76,6 +77,7 @@ type Options struct { IntegrationOptions map[string]any EnabledFrameworks sets.Set[string] ManagerName string + LabelKeysToCopy []string } // Option configures the reconciler. @@ -140,6 +142,13 @@ func WithManagerName(n string) Option { } } +// WithLabelKeysToCopy +func WithLabelKeysToCopy(n []string) Option { + return func(o *Options) { + o.LabelKeysToCopy = n + } +} + var defaultOptions = Options{} func NewReconciler( @@ -153,6 +162,7 @@ func NewReconciler( record: record, manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, waitForPodsReady: options.WaitForPodsReady, + labelKeysToCopy: options.LabelKeysToCopy, } } @@ -789,11 +799,10 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o log := ctrl.LoggerFrom(ctx) if cj, implements := job.(ComposableJob); implements { - wl, err := cj.ConstructComposableWorkload(ctx, r.client, r.record) + wl, err := cj.ConstructComposableWorkload(ctx, r.client, r.record, r.labelKeysToCopy) if err != nil { return nil, err } - return wl, nil } @@ -803,7 +812,7 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o ObjectMeta: metav1.ObjectMeta{ Name: GetWorkloadNameForOwnerWithGVK(object.GetName(), object.GetUID(), job.GVK()), Namespace: object.GetNamespace(), - Labels: map[string]string{}, + Labels: maps.FilterKeys(job.Object().GetLabels(), r.labelKeysToCopy), Finalizers: []string{kueue.ResourceInUseFinalizerName}, Annotations: admissioncheck.FilterProvReqAnnotations(job.Object().GetAnnotations()), }, @@ -812,7 +821,9 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o QueueName: QueueName(job), }, } - + if wl.Labels == nil { + wl.Labels = make(map[string]string) + } jobUid := string(job.Object().GetUID()) if errs := validation.IsValidLabelValue(jobUid); len(errs) == 0 { wl.Labels[controllerconsts.JobUIDLabel] = jobUid diff --git a/pkg/controller/jobframework/reconciler_test.go b/pkg/controller/jobframework/reconciler_test.go index ff854e4703..12fcde150b 100644 --- a/pkg/controller/jobframework/reconciler_test.go +++ b/pkg/controller/jobframework/reconciler_test.go @@ -114,6 +114,7 @@ func TestProcessOptions(t *testing.T) { WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), &configapi.PodIntegrationOptions{ PodSelector: &metav1.LabelSelector{}, }), + WithLabelKeysToCopy([]string{"toCopyKey"}), }, wantOpts: Options{ ManageJobsWithoutQueueName: true, @@ -124,6 +125,7 @@ func TestProcessOptions(t *testing.T) { PodSelector: &metav1.LabelSelector{}, }, }, + LabelKeysToCopy: []string{"toCopyKey"}, }, }, "a single option is passed": { @@ -143,6 +145,7 @@ func TestProcessOptions(t *testing.T) { WaitForPodsReady: false, KubeServerVersion: nil, IntegrationOptions: nil, + LabelKeysToCopy: nil, }, }, } diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index ed2d635754..3b22b229af 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -457,6 +457,41 @@ func TestReconciler(t *testing.T) { }, }, }, + "when workload is created, it has correct labels set": { + job: *baseJobWrapper.Clone(). + Label("toCopyKey", "toCopyValue"). + Label("dontCopyKey", "dontCopyValue"). + UID("test-uid"). + Obj(), + wantJob: *baseJobWrapper.Clone(). + Label("toCopyKey", "toCopyValue"). + Label("dontCopyKey", "dontCopyValue"). + UID("test-uid"). + Suspend(true). + Obj(), + reconcilerOptions: []jobframework.Option{ + jobframework.WithLabelKeysToCopy([]string{"toCopyKey", "redundantToCopyKey"}), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns"). + Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + Queue("foo"). + Priority(0). + Labels(map[string]string{ + controllerconsts.JobUIDLabel: "test-uid", + "toCopyKey": "toCopyValue"}). + Obj(), + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Name: "job", Namespace: "ns"}, + EventType: "Normal", + Reason: "CreatedWorkload", + Message: "Created Workload: ns/" + GetWorkloadNameForJob(baseJobWrapper.Name, types.UID("test-uid")), + }, + }, + }, "when workload is admitted the PodSetUpdates are propagated to job": { job: *baseJobWrapper.Clone(). Obj(), diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 39c5e97969..eecf864c69 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -53,6 +53,7 @@ import ( "sigs.k8s.io/kueue/pkg/podset" "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/util/kubeversion" + "sigs.k8s.io/kueue/pkg/util/maps" "sigs.k8s.io/kueue/pkg/util/parallelize" utilslices "sigs.k8s.io/kueue/pkg/util/slices" ) @@ -78,6 +79,7 @@ var ( errIncorrectReconcileRequest = fmt.Errorf("event handler error: got a single pod reconcile request for a pod group") errPendingOps = jobframework.UnretryableError("waiting to observe previous operations on pods") errPodNoSupportKubeVersion = errors.New("pod integration only supported in Kubernetes 1.27 or newer") + errPodGroupLabelsMismatch = errors.New("constructing workload: pods have different label values") ) func init() { @@ -898,7 +900,30 @@ func (p *Pod) ensureWorkloadOwnedByAllMembers(ctx context.Context, c client.Clie return nil } -func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder) (*kueue.Workload, error) { +func (p *Pod) getWorkloadLabels(labelKeysToCopy []string) (map[string]string, error) { + if len(labelKeysToCopy) == 0 { + return nil, nil + } + if !p.isGroup { + return maps.FilterKeys(p.Object().GetLabels(), labelKeysToCopy), nil + } + workloadLabels := make(map[string]string, len(labelKeysToCopy)) + for _, pod := range p.list.Items { + for _, labelKey := range labelKeysToCopy { + labelValuePod, foundInPod := pod.Labels[labelKey] + labelValueWorkload, foundInWorkload := workloadLabels[labelKey] + if foundInPod && foundInWorkload && (labelValuePod != labelValueWorkload) { + return nil, errPodGroupLabelsMismatch + } + if foundInPod { + workloadLabels[labelKey] = labelValuePod + } + } + } + return workloadLabels, nil +} + +func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder, labelKeysToCopy []string) (*kueue.Workload, error) { object := p.Object() log := ctrl.LoggerFrom(ctx) @@ -934,7 +959,11 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client, if err := controllerutil.SetControllerReference(object, wl, c.Scheme()); err != nil { return nil, err } - + labelsToCopy, err := p.getWorkloadLabels(labelKeysToCopy) + if err != nil { + return nil, err + } + wl.Labels = maps.MergeKeepFirst(wl.Labels, labelsToCopy) return wl, nil } @@ -988,7 +1017,11 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client, return nil, err } } - + labelsToCopy, err := p.getWorkloadLabels(labelKeysToCopy) + if err != nil { + return nil, err + } + wl.Labels = maps.MergeKeepFirst(wl.Labels, labelsToCopy) return wl, nil } diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 1dde6f6a6d..89d3891d03 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -163,7 +163,8 @@ func TestReconciler(t *testing.T) { // If true, the test will delete workloads before running reconcile deleteWorkloads bool - wantEvents []utiltesting.EventRecord + wantEvents []utiltesting.EventRecord + reconcilerOptions []jobframework.Option }{ "scheduling gate is removed and node selector is added if workload is admitted": { initObjects: []client.Object{ @@ -3371,6 +3372,138 @@ func TestReconciler(t *testing.T) { }, }, }, + "workload is created with correct labels for a single pod": { + pods: []corev1.Pod{*basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + Label("toCopyKey", "toCopyValue"). + Label("dontCopyKey", "dontCopyValue"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + Obj()}, + wantPods: nil, + reconcilerOptions: []jobframework.Option{ + jobframework.WithLabelKeysToCopy([]string{"toCopyKey", "keyAbsentInThePod"}), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload(GetWorkloadNameForPod(basePodWrapper.GetName(), basePodWrapper.GetUID()), "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets( + *utiltesting.MakePodSet(kueue.DefaultPodSetName, 1). + Request(corev1.ResourceCPU, "1"). + SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}). + Obj(), + ). + Queue("test-queue"). + Priority(0). + ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid"). + Labels(map[string]string{ + controllerconsts.JobUIDLabel: "test-uid", + "toCopyKey": "toCopyValue", + }). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Name: "pod", Namespace: "ns"}, + EventType: "Normal", + Reason: "CreatedWorkload", + Message: "Created Workload: ns/" + GetWorkloadNameForPod(basePodWrapper.GetName(), basePodWrapper.GetUID()), + }, + }, + }, + "workload is created with correct labels for pod group": { + pods: []corev1.Pod{ + *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + Label("toCopyKey1", "toCopyValue1"). + Label("dontCopyKey", "dontCopyValue"). + KueueFinalizer(). + KueueSchedulingGate(). + Group("test-group"). + GroupTotalCount("2"). + Obj(), + *basePodWrapper. + Clone(). + Name("pod2"). + Label("kueue.x-k8s.io/managed", "true"). + Label("toCopyKey1", "toCopyValue1"). + Label("toCopyKey2", "toCopyValue2"). + Label("dontCopyKey", "dontCopyValue"). + KueueFinalizer(). + KueueSchedulingGate(). + Group("test-group"). + GroupTotalCount("2"). + Obj(), + }, + wantPods: nil, + reconcilerOptions: []jobframework.Option{ + jobframework.WithLabelKeysToCopy([]string{"toCopyKey1", "toCopyKey2"}), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets( + *utiltesting.MakePodSet("dc85db45", 2). + Request(corev1.ResourceCPU, "1"). + SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}). + Obj(), + ). + Queue("user-queue"). + Priority(0). + OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid"). + OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid"). + Annotations(map[string]string{ + "kueue.x-k8s.io/is-group-workload": "true"}). + Labels(map[string]string{ + "toCopyKey1": "toCopyValue1", + "toCopyKey2": "toCopyValue2", + }). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Name: "pod", Namespace: "ns"}, + EventType: "Normal", + Reason: "CreatedWorkload", + Message: "Created Workload: ns/test-group", + }, + }, + }, + "reconciler returns error in case of label mismatch in pod group": { + pods: []corev1.Pod{ + *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + Label("toCopyKey1", "toCopyValue1"). + Label("dontCopyKey", "dontCopyValue"). + KueueFinalizer(). + KueueSchedulingGate(). + Group("test-group"). + GroupTotalCount("2"). + Obj(), + *basePodWrapper. + Clone(). + Name("pod2"). + Label("kueue.x-k8s.io/managed", "true"). + Label("toCopyKey1", "otherValue"). + Label("toCopyKey2", "toCopyValue2"). + Label("dontCopyKey", "dontCopyValue"). + KueueFinalizer(). + KueueSchedulingGate(). + Group("test-group"). + GroupTotalCount("2"). + Obj(), + }, + wantPods: nil, + reconcilerOptions: []jobframework.Option{ + jobframework.WithLabelKeysToCopy([]string{"toCopyKey1", "toCopyKey2"}), + }, + wantWorkloads: nil, + wantErr: errPodGroupLabelsMismatch, + }, } for name, tc := range testCases { @@ -3403,7 +3536,7 @@ func TestReconciler(t *testing.T) { } } recorder := &utiltesting.EventRecorder{} - reconciler := NewReconciler(kClient, recorder) + reconciler := NewReconciler(kClient, recorder, tc.reconcilerOptions...) pReconciler := reconciler.(*Reconciler) for _, e := range tc.excessPodsExpectations { pReconciler.expectationsStore.ExpectUIDs(log, e.key, e.uids) diff --git a/pkg/util/maps/maps.go b/pkg/util/maps/maps.go index 3d0e89e2e1..675d72fa50 100644 --- a/pkg/util/maps/maps.go +++ b/pkg/util/maps/maps.go @@ -94,3 +94,17 @@ func Keys[K comparable, V any, M ~map[K]V](m M) []K { } return ret } + +// Filter returns a sub-map containing only keys from the given list +func FilterKeys[K comparable, V any, M ~map[K]V](m M, k []K) M { + if m == nil || len(k) == 0 { + return nil + } + ret := make(M, len(k)) + for _, key := range k { + if v, found := m[key]; found { + ret[key] = v + } + } + return ret +} diff --git a/pkg/util/maps/maps_test.go b/pkg/util/maps/maps_test.go index 599064c0bc..77e9db741e 100644 --- a/pkg/util/maps/maps_test.go +++ b/pkg/util/maps/maps_test.go @@ -203,3 +203,81 @@ func TestMergeIntersect(t *testing.T) { }) } } + +func TestFilterKeys(t *testing.T) { + cases := map[string]struct { + m map[string]int + k []string + want map[string]int + }{ + "nil m": { + m: nil, + k: []string{ + "k1", + }, + want: nil, + }, + "nil k": { + m: map[string]int{ + "v1": 1, + }, + k: nil, + want: nil, + }, + "empty k": { + m: map[string]int{ + "v1": 1, + "v2": 2, + "v3": 3, + }, + k: []string{}, + want: nil, + }, + "empty m": { + m: map[string]int{}, + k: []string{"k1"}, + want: map[string]int{}, + }, + "filter one": { + m: map[string]int{ + "v1": 1, + "v2": 2, + "v3": 3, + }, + k: []string{"v1", "v3"}, + want: map[string]int{ + "v1": 1, + "v3": 3, + }, + }, + "filter two": { + m: map[string]int{ + "v1": 1, + "v2": 2, + "v3": 3, + }, + k: []string{"v1"}, + want: map[string]int{ + "v1": 1, + }, + }, + "filter all": { + m: map[string]int{ + "v1": 1, + "v2": 2, + "v3": 3, + }, + k: []string{"v4"}, + want: map[string]int{}, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := FilterKeys(tc.m, tc.k) + if diff := cmp.Diff(got, tc.want); diff != "" { + t.Errorf("Unexpected result, expecting %v", tc.want) + } + }) + } +} diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 585ce59f6d..279c5405f6 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -71,6 +71,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu cfg = fwk.Init() ctx, k8sClient = fwk.RunManager(cfg, managerSetup( jobframework.WithManageJobsWithoutQueueName(true), + jobframework.WithLabelKeysToCopy([]string{"toCopyKey"}), )) }) ginkgo.AfterAll(func() { @@ -108,6 +109,8 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu PriorityClass(priorityClassName). SetAnnotation("provreq.kueue.x-k8s.io/ValidUntilSeconds", "0"). SetAnnotation("invalid-provreq-prefix/Foo", "Bar"). + Label("toCopyKey", "toCopyValue"). + Label("doNotCopyKey", "doNotCopyValue"). Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name} @@ -135,6 +138,9 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu gomega.Expect(*createdWorkload.Spec.Priority).Should(gomega.Equal(int32(priorityValue))) gomega.Expect(createdWorkload.Annotations).Should(gomega.Equal(map[string]string{"provreq.kueue.x-k8s.io/ValidUntilSeconds": "0"})) + gomega.Expect(createdWorkload.Labels["toCopyKey"]).Should(gomega.Equal("toCopyValue")) + gomega.Expect(createdWorkload.Labels).ShouldNot(gomega.ContainElement("doNotCopyValue")) + ginkgo.By("checking the workload is updated with queue name when the job does") jobQueueName := "test-queue" createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} @@ -146,6 +152,17 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu return createdWorkload.Spec.QueueName == jobQueueName }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + ginkgo.By("checking the workload label is not updated when the job label is") + newJobLabelValue := "updatedValue" + createdJob.Labels["toCopyKey"] = newJobLabelValue + gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Consistently(func() string { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return "" + } + return createdWorkload.Labels["toCopyKey"] + }, util.ConsistentDuration, util.Interval).Should(gomega.Equal("toCopyValue")) + ginkgo.By("updated workload should have the same created timestamp", func() { gomega.Expect(createdWorkload.CreationTimestamp).Should(gomega.Equal(createdTime)) }) diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index 057576d30a..9a3b476d93 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -85,6 +85,7 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }, }, }), + jobframework.WithLabelKeysToCopy([]string{"toCopyKey"}), )) gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).To(gomega.Succeed()) gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) @@ -509,12 +510,15 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu Group("test-group"). GroupTotalCount("2"). Queue("test-queue"). + Label("dontCopyKey", "dontCopyValue"). Obj() pod2 := testingpod.MakePod("test-pod2", ns.Name). Group("test-group"). GroupTotalCount("2"). Request(corev1.ResourceCPU, "1"). Queue("test-queue"). + Label("toCopyKey", "toCopyValue"). + Label("dontCopyKey", "dontCopyAnotherValue"). Obj() pod1LookupKey := client.ObjectKeyFromObject(pod1) pod2LookupKey := client.ObjectKeyFromObject(pod2) @@ -569,6 +573,9 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }, wlConditionCmpOpts..., )) + ginkgo.By("Checking the workload gets assigned the correct labels.") + gomega.Expect(createdWorkload.Labels["toCopyKey"]).Should(gomega.Equal("toCopyValue")) + gomega.Expect(createdWorkload.Labels).ShouldNot(gomega.ContainElement("doNotCopyValue")) }) ginkgo.By("checking that pod group is finalized when all pods in the group succeed", func() {