diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index dcdaa35d83..e59eb7eaf5 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -76,6 +76,10 @@ type Configuration struct { // Resources provides additional configuration options for handling the resources. Resources *Resources `json:"resources,omitempty"` + + // ObjectRetentionPolicies provides configuration options for retention of Kueue owned + // objects. A nil value will disable automatic deletion for all objects. + ObjectRetentionPolicies *ObjectRetentionPolicies `json:"objectRetentionPolicies,omitempty"` } type ControllerManager struct { @@ -400,3 +404,15 @@ type FairSharing struct { // The default strategy is ["LessThanOrEqualToFinalShare", "LessThanInitialShare"]. PreemptionStrategies []PreemptionStrategy `json:"preemptionStrategies,omitempty"` } + +type ObjectRetentionPolicies struct { + // FinishedWorkloadRetention is the duration to retain finished Workloads. + // A duration of 0 will delete finished Workloads immediately. + // A nil value will disable automatic deletion. + // The value is represented using the metav1.Duration format, allowing for flexible + // specification of time units (e.g., "24h", "1h30m", "30s"). + // + // Defaults to null. + // +optional + FinishedWorkloadRetention *metav1.Duration `json:"finishedWorkloadRetention"` +} diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index c9c546a072..060966524b 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -193,4 +193,7 @@ func SetDefaults_Configuration(cfg *Configuration) { if fs := cfg.FairSharing; fs != nil && fs.Enable && len(fs.PreemptionStrategies) == 0 { fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare, LessThanInitialShare} } + if cfg.ObjectRetentionPolicies == nil { + cfg.ObjectRetentionPolicies = &ObjectRetentionPolicies{} + } } diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index 99a9c42ee5..9cfcabfd66 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -102,6 +102,10 @@ func TestSetDefaults_Configuration(t *testing.T) { WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}, } + defaultObjectRetentionPolicies := &ObjectRetentionPolicies{ + FinishedWorkloadRetention: nil, + } + podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute} @@ -121,10 +125,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "defaulting ControllerManager": { @@ -162,10 +167,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "should not default ControllerManager": { @@ -219,10 +225,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "should not set LeaderElectionID": { @@ -260,10 +267,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "defaulting InternalCertManagement": { @@ -278,10 +286,11 @@ func TestSetDefaults_Configuration(t *testing.T) { WebhookServiceName: ptr.To(DefaultWebhookServiceName), WebhookSecretName: ptr.To(DefaultWebhookSecretName), }, - ClientConnection: defaultClientConnection, - Integrations: overwriteNamespaceIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: overwriteNamespaceIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "should not default InternalCertManagement": { @@ -297,10 +306,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: overwriteNamespaceIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: overwriteNamespaceIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "should not default values in custom ClientConnection": { @@ -324,9 +334,10 @@ func TestSetDefaults_Configuration(t *testing.T) { QPS: ptr.To[float32](123.0), Burst: ptr.To[int32](456), }, - Integrations: overwriteNamespaceIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + Integrations: overwriteNamespaceIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "should default empty custom ClientConnection": { @@ -343,10 +354,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: overwriteNamespaceIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: overwriteNamespaceIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "defaulting waitForPodsReady values": { @@ -374,10 +386,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "set waitForPodsReady.blockAdmission to false when enable is false": { @@ -405,10 +418,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "respecting provided waitForPodsReady values": { @@ -442,10 +456,11 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "integrations": { @@ -468,8 +483,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Frameworks: []string{"a", "b"}, PodOptions: defaultIntegrations.PodOptions, }, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "queue visibility": { @@ -498,7 +514,8 @@ func TestSetDefaults_Configuration(t *testing.T) { MaxCount: 0, }, }, - MultiKueue: defaultMultiKueue, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "multiKueue": { @@ -526,6 +543,7 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: ptr.To("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, }, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "multiKueue GCInterval 0": { @@ -552,6 +570,7 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: ptr.To("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute}, }, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, }, "add default fair sharing configuration when enabled": { @@ -577,6 +596,31 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: true, PreemptionStrategies: []PreemptionStrategy{LessThanOrEqualToFinalShare, LessThanInitialShare}, }, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, + }, + }, + "set object retention policy for finished workloads": { + original: &Configuration{ + InternalCertManagement: &InternalCertManagement{ + Enable: ptr.To(false), + }, + ObjectRetentionPolicies: &ObjectRetentionPolicies{ + FinishedWorkloadRetention: &metav1.Duration{Duration: 30 * time.Minute}, + }, + }, + want: &Configuration{ + Namespace: ptr.To(DefaultNamespace), + ControllerManager: defaultCtrlManagerConfigurationSpec, + InternalCertManagement: &InternalCertManagement{ + Enable: ptr.To(false), + }, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: &ObjectRetentionPolicies{ + FinishedWorkloadRetention: &metav1.Duration{Duration: 30 * time.Minute}, + }, }, }, } diff --git a/charts/kueue/values.yaml b/charts/kueue/values.yaml index 0fa2ef3f9e..900b883a23 100644 --- a/charts/kueue/values.yaml +++ b/charts/kueue/values.yaml @@ -118,6 +118,8 @@ managerConfig: # preemptionStrategies: [LessThanOrEqualToFinalShare, LessThanInitialShare] #resources: # excludeResourcePrefixes: [] + #objectRetentionPolicies: + # finishedWorkloadRetention: null # null indicates infinite retention, 0s means no retention at all # ports definition for metricsService and webhookService. metricsService: ports: diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml index c0a9f59731..5c089f6410 100644 --- a/config/components/manager/controller_manager_config.yaml +++ b/config/components/manager/controller_manager_config.yaml @@ -62,3 +62,5 @@ integrations: # preemptionStrategies: [LessThanOrEqualToFinalShare, LessThanInitialShare] #resources: # excludeResourcePrefixes: [] +#objectRetentionPolicies: +# finishedWorkloadRetention: null # null indicates infinite retention, 0s means no retention at all diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 8fccb24022..7cd8ca2995 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -298,6 +298,17 @@ webhook: t.Fatal(err) } + objectRetentionPoliciesConfig := filepath.Join(tmpDir, "objectRetentionPolicies.yaml") + if err := os.WriteFile(objectRetentionPoliciesConfig, []byte(` +apiVersion: config.kueue.x-k8s.io/v1beta1 +kind: Configuration +namespace: kueue-system +objectRetentionPolicies: + finishedWorkloadRetention: 30m +`), os.FileMode(0600)); err != nil { + t.Fatal(err) + } + defaultControlOptions := ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, Metrics: metricsserver.Options{ @@ -369,6 +380,8 @@ webhook: WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, } + defaultObjectRetentionPolicies := &configapi.ObjectRetentionPolicies{} + testcases := []struct { name string configFile string @@ -380,12 +393,13 @@ webhook: name: "default config", configFile: "", wantConfiguration: configapi.Configuration{ - Namespace: ptr.To(configapi.DefaultNamespace), - InternalCertManagement: enableDefaultInternalCertManagement, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + Namespace: ptr.To(configapi.DefaultNamespace), + InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -441,8 +455,9 @@ webhook: PodSelector: &metav1.LabelSelector{}, }, }, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: defaultControlOptions, }, @@ -461,6 +476,7 @@ webhook: Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: ":38081", @@ -495,10 +511,11 @@ webhook: WebhookServiceName: ptr.To("kueue-tenant-a-webhook-service"), WebhookSecretName: ptr.To("kueue-tenant-a-webhook-server-cert"), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: defaultControlOptions, }, @@ -515,10 +532,11 @@ webhook: InternalCertManagement: &configapi.InternalCertManagement{ Enable: ptr.To(false), }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: defaultControlOptions, }, @@ -537,6 +555,7 @@ webhook: Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ @@ -579,10 +598,11 @@ webhook: BackoffMaxSeconds: ptr.To[int32](1800), }, }, - ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -617,9 +637,10 @@ webhook: QPS: ptr.To[float32](50), Burst: ptr.To[int32](100), }, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: defaultControlOptions, }, @@ -638,9 +659,10 @@ webhook: QPS: ptr.To[float32](50), Burst: ptr.To[int32](100), }, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -702,8 +724,9 @@ webhook: PodSelector: &metav1.LabelSelector{}, }, }, - QueueVisibility: defaultQueueVisibility, - MultiKueue: defaultMultiKueue, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -742,7 +765,8 @@ webhook: MaxCount: 0, }, }, - MultiKueue: defaultMultiKueue, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -800,7 +824,8 @@ webhook: }, }, }, - MultiKueue: defaultMultiKueue, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -839,6 +864,7 @@ webhook: Origin: ptr.To("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: 10 * time.Minute}, }, + ObjectRetentionPolicies: defaultObjectRetentionPolicies, }, wantOptions: defaultControlOptions, }, @@ -850,6 +876,27 @@ webhook: errors.New("unknown field \"namespaces\""), }), }, + { + name: "objectRetentionPolicies config", + configFile: objectRetentionPoliciesConfig, + wantConfiguration: configapi.Configuration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: configapi.GroupVersion.String(), + Kind: "Configuration", + }, + Namespace: ptr.To(configapi.DefaultNamespace), + ManageJobsWithoutQueueName: false, + InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, + MultiKueue: defaultMultiKueue, + ObjectRetentionPolicies: &configapi.ObjectRetentionPolicies{ + FinishedWorkloadRetention: &metav1.Duration{Duration: 30 * time.Minute}, + }, + }, + wantOptions: defaultControlOptions, + }, } for _, tc := range testcases { @@ -962,6 +1009,9 @@ func TestEncode(t *testing.T) { "origin": "multikueue", "workerLostTimeout": "15m0s", }, + "objectRetentionPolicies": map[string]any{ + "finishedWorkloadRetention": nil, + }, }, }, } diff --git a/pkg/config/validation.go b/pkg/config/validation.go index 08220a0441..b66a12dfe2 100644 --- a/pkg/config/validation.go +++ b/pkg/config/validation.go @@ -57,6 +57,7 @@ var ( fsPreemptionStrategiesPath = field.NewPath("fairSharing", "preemptionStrategies") internalCertManagementPath = field.NewPath("internalCertManagement") queueVisibilityPath = field.NewPath("queueVisibility") + objectRetentionPoliciesPath = field.NewPath("objectRetentionPolicies") ) func validate(c *configapi.Configuration, scheme *runtime.Scheme) field.ErrorList { @@ -67,6 +68,7 @@ func validate(c *configapi.Configuration, scheme *runtime.Scheme) field.ErrorLis allErrs = append(allErrs, validateMultiKueue(c)...) allErrs = append(allErrs, validateFairSharing(c)...) allErrs = append(allErrs, validateInternalCertManagement(c)...) + allErrs = append(allErrs, validateObjectRetentionPolicies(c)...) return allErrs } @@ -278,3 +280,16 @@ func validateFairSharing(c *configapi.Configuration) field.ErrorList { } return allErrs } + +func validateObjectRetentionPolicies(c *configapi.Configuration) field.ErrorList { + var allErrs field.ErrorList + + rr := c.ObjectRetentionPolicies + if rr != nil { + if rr.FinishedWorkloadRetention != nil && rr.FinishedWorkloadRetention.Duration < 0 { + allErrs = append(allErrs, field.Invalid(objectRetentionPoliciesPath.Child("finishedWorkloadRetention"), + c.ObjectRetentionPolicies.FinishedWorkloadRetention, constants.IsNegativeErrorMsg)) + } + } + return allErrs +} diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index cb24eab9d7..cde018a69e 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -75,6 +75,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache mgr.GetEventRecorderFor(constants.WorkloadControllerName), WithWorkloadUpdateWatchers(qRec, cqRec), WithWaitForPodsReady(waitForPodsReady(cfg.WaitForPodsReady)), + WithWorkloadObjectRetention(cfg.ObjectRetentionPolicies.FinishedWorkloadRetention), ).SetupWithManager(mgr, cfg); err != nil { return "Workload", err } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 052076189e..11e12a2063 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -71,6 +71,7 @@ type waitForPodsReadyConfig struct { type options struct { watchers []WorkloadUpdateWatcher waitForPodsReadyConfig *waitForPodsReadyConfig + objectRetention *metav1.Duration } // Option configures the reconciler. @@ -90,6 +91,13 @@ func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option { } } +// WithWorkloadObjectRetention allows to specify retention for workload resources +func WithWorkloadObjectRetention(value *metav1.Duration) Option { + return func(o *options) { + o.objectRetention = value + } +} + var defaultOptions = options{} type WorkloadUpdateWatcher interface { @@ -106,6 +114,7 @@ type WorkloadReconciler struct { waitForPodsReady *waitForPodsReadyConfig recorder record.EventRecorder clock clock.Clock + objectRetention *metav1.Duration } func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler { @@ -123,6 +132,7 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c waitForPodsReady: options.waitForPodsReadyConfig, recorder: recorder, clock: realClock, + objectRetention: options.objectRetention, } } @@ -144,11 +154,29 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.V(2).Info("Reconciling Workload") if len(wl.ObjectMeta.OwnerReferences) == 0 && !wl.DeletionTimestamp.IsZero() { + // manual deletion triggered by the user return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, &wl) } - if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) { - return ctrl.Result{}, nil + finishedCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadFinished) + if finishedCond != nil && finishedCond.Status == metav1.ConditionTrue { + if r.objectRetention == nil { + return ctrl.Result{}, nil + } + now := r.clock.Now() + expirationTime := finishedCond.LastTransitionTime.Add(r.objectRetention.Duration) + if now.After(expirationTime) { + log.V(2).Info("Deleting workload because it has finished and the retention period has elapsed", "retention", r.objectRetention.Duration) + if err := r.client.Delete(ctx, &wl); err != nil { + log.Error(err, "Failed to delete workload from the API server") + return ctrl.Result{}, fmt.Errorf("deleting workflow from the API server: %w", err) + } + r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Deleted", "Deleted finished workload due to elapsed retention: %v", workload.Key(&wl)) + } else { + remainingTime := expirationTime.Sub(now) + log.V(2).Info("Requeueing workload for deletion after retention period", "remainingTime", remainingTime) + return ctrl.Result{RequeueAfter: remainingTime}, nil + } } if workload.IsActive(&wl) { diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index 195574bebe..9da0cd54db 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -1314,6 +1314,95 @@ func TestReconcile(t *testing.T) { }). Obj(), }, + "shouldn't delete the workload because, object retention not configured": { + workload: utiltesting.MakeWorkload("wl", "ns"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + }). + Obj(), + reconcilerOpts: []Option{ + WithWorkloadObjectRetention(nil), + }, + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + }). + Obj(), + wantError: nil, + }, + "shouldn't try to delete the workload (no event emitted) because it is already being deleted by kubernetes, object retention configured": { + workload: utiltesting.MakeWorkload("wl", "ns"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Time{ + Time: testStartTime, + }, + }). + DeletionTimestamp(testStartTime). + Finalizers(kueue.ResourceInUseFinalizerName). + Obj(), + reconcilerOpts: []Option{ + WithWorkloadObjectRetention(&metav1.Duration{ + Duration: 24 * time.Hour, + }), + }, + wantWorkload: nil, + wantError: nil, + }, + "shouldn't try to delete the workload because the retention period hasn't elapsed yet, object retention configured": { + workload: utiltesting.MakeWorkload("wl", "ns"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(testStartTime.Add(-10 * time.Second)), + }). + Obj(), + reconcilerOpts: []Option{ + WithWorkloadObjectRetention( + &metav1.Duration{ + Duration: 30 * time.Second, + }), + }, + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(testStartTime.Add(-10 * time.Second)), + }). + Obj(), + wantError: nil, + }, + "should delete the workload because the retention period has elapsed, object retention configured": { + workload: utiltesting.MakeWorkload("wl", "ns"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(testStartTime.Add(-40 * time.Second)), + }). + Obj(), + reconcilerOpts: []Option{ + WithWorkloadObjectRetention( + &metav1.Duration{ + Duration: 30 * time.Second, + }), + }, + wantWorkload: nil, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{ + Namespace: "ns", + Name: "wl", + }, + EventType: corev1.EventTypeNormal, + Reason: "Deleted", + Message: "Deleted finished workload due to elapsed retention: ns/wl", + }, + }, + wantError: nil, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 2be3625a87..338cbf05f2 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -603,7 +603,6 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o // If there is no matching workload and the job is running, suspend it. if match == nil && !job.IsSuspended() { - log.V(2).Info("job with no matching workload, suspending") var w *kueue.Workload if len(toDelete) == 1 { // The job may have been modified and hence the existing workload @@ -613,6 +612,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o } if _, _, finished := job.Finished(); !finished { + log.V(2).Info("job with no matching workload, suspending") var msg string if w == nil { msg = "Missing Workload; unable to restore pod templates" diff --git a/test/integration/controller/core/suite_test.go b/test/integration/controller/core/suite_test.go index 8738928ebc..364d22ff5c 100644 --- a/test/integration/controller/core/suite_test.go +++ b/test/integration/controller/core/suite_test.go @@ -20,9 +20,11 @@ import ( "context" "path/filepath" "testing" + "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -78,3 +80,32 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { failedCtrl, err := core.SetupControllers(mgr, queues, cCache, controllersCfg) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) } + +func managerSetupWithWorkloadRetentionEnabled(ctx context.Context, mgr manager.Manager) { + objectRetention := &config.ObjectRetentionPolicies{FinishedWorkloadRetention: &v1.Duration{ + Duration: 3 * time.Second, + }} + + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + + controllersCfg := &config.Configuration{ObjectRetentionPolicies: objectRetention} + mgr.GetScheme().Default(controllersCfg) + + controllersCfg.Metrics.EnableClusterQueueResources = true + controllersCfg.QueueVisibility = &config.QueueVisibility{ + UpdateIntervalSeconds: 2, + ClusterQueues: &config.ClusterQueueVisibility{ + MaxCount: 3, + }, + } + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, controllersCfg) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) +} diff --git a/test/integration/controller/core/workload_controller_test.go b/test/integration/controller/core/workload_controller_test.go index 38a64f8624..ed64262859 100644 --- a/test/integration/controller/core/workload_controller_test.go +++ b/test/integration/controller/core/workload_controller_test.go @@ -18,6 +18,7 @@ package core import ( "fmt" + "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -429,3 +430,91 @@ var _ = ginkgo.Describe("Workload controller", ginkgo.Ordered, ginkgo.ContinueOn }) }) }) + +var _ = ginkgo.Describe("Workload controller with resource retention", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + var ( + ns *corev1.Namespace + createdWorkload kueue.Workload + localQueue *kueue.LocalQueue + clusterQueue *kueue.ClusterQueue + flavor *kueue.ResourceFlavor + ) + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{CRDPath: crdPath, WebhookPath: webhookPath} + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerSetupWithWorkloadRetentionEnabled) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "core-workload-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + createdWorkload = kueue.Workload{} + clusterQueue = nil + localQueue = nil + flavor = nil + }) + + ginkgo.When("a workload with retention period", func() { + ginkgo.BeforeEach(func() { + flavor = testing.MakeResourceFlavor(flavorOnDemand).Obj() + gomega.Expect(k8sClient.Create(ctx, flavor)).Should(gomega.Succeed()) + clusterQueue = testing.MakeClusterQueue("cq"). + ResourceGroup(*testing.MakeFlavorQuotas(flavorOnDemand). + Resource(corev1.ResourceCPU, "1").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("q", ns.Name).ClusterQueue("cq").Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.It("should delete the workload after retention period elapses", func() { + // Create workload + wl := testing.MakeWorkload("wl-to-expire", ns.Name).Queue("q").Obj() + wlKey := client.ObjectKeyFromObject(wl) + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + + // Simulate admission + admission := testing.MakeAdmission("cq").Obj() + gomega.Expect(k8sClient.Get(ctx, wlKey, &createdWorkload)).To(gomega.Succeed()) + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, &createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, &createdWorkload) + + // Mark as Finished + gomega.Expect(k8sClient.Get(ctx, wlKey, &createdWorkload)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload.Status.Conditions = append(createdWorkload.Status.Conditions, metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "FinishedByTest", + Message: "Finished for testing purposes", + }) + g.Expect(k8sClient.Status().Update(ctx, &createdWorkload)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("workload should not be deleted before retention period") + gomega.Consistently(func() error { + return k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWorkload) + }, 2*time.Second, time.Second).Should(gomega.Succeed()) + + ginkgo.By("workload should be deleted after the retention period") + gomega.Eventually(func() error { + return k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWorkload) + }, 2*time.Second, time.Second).ShouldNot(gomega.Succeed()) + }) + }) +})