From 82a0eb9f02fe57b19146dd692cd2e9e2aed6b496 Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Thu, 25 Jan 2024 15:00:32 +0200
Subject: [PATCH 01/12] [multikueue] Add garbage collector.
---
apis/config/v1beta1/configuration_types.go | 10 ++
apis/config/v1beta1/defaults.go | 8 +
apis/config/v1beta1/defaults_test.go | 34 ++++
apis/config/v1beta1/zz_generated.deepcopy.go | 25 +++
apis/kueue/v1alpha1/multikueue_types.go | 8 +
.../kueue.x-k8s.io_multikueueclusters.yaml | 9 +
.../kueue/v1alpha1/multikueueclusterspec.go | 9 +
cmd/kueue/main.go | 2 +-
cmd/kueue/main_test.go | 1 +
.../kueue.x-k8s.io_multikueueclusters.yaml | 9 +
pkg/config/config_test.go | 45 +++++
.../admissionchecks/multikueue/controllers.go | 36 +++-
.../multikueue/jobset_adapter_test.go | 4 +-
.../multikueue/multikueuecluster.go | 87 +++++++++-
.../multikueue/multikueuecluster_test.go | 159 +++++++++++++++++-
.../admissionchecks/multikueue/workload.go | 12 +-
.../multikueue/workload_test.go | 33 ++--
pkg/util/testing/wrappers.go | 8 +
.../en/docs/reference/kueue-config.v1beta1.md | 31 ++++
.../integration/multikueue/multikueue_test.go | 58 +++++++
test/integration/multikueue/suite_test.go | 3 +-
21 files changed, 564 insertions(+), 27 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index 4eb963f5cd..6f34643969 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -67,6 +67,9 @@ type Configuration struct {
// QueueVisibility is configuration to expose the information about the top
// pending workloads.
QueueVisibility *QueueVisibility `json:"queueVisibility,omitempty"`
+
+ // MultiKueue controls the behaviour of the MultiKueue AdmissionCheck Controller.
+ MultiKueue *MultiKueue `json:"multiKueue,omitempty"`
}
type ControllerManager struct {
@@ -199,6 +202,13 @@ type WaitForPodsReady struct {
RequeuingTimestamp *RequeuingTimestamp `json:"requeuingTimestamp,omitempty"`
}
+type MultiKueue struct {
+ // GCTimeout defines the timeout between two consecutive garbage collection runs.
+ // Defaults to 1min. If 0, the garbage collection is disabled.
+ // +optional
+ GCTimeout *metav1.Duration `json:"gcTimeout,omitempty"`
+}
+
type RequeuingTimestamp string
const (
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index 084c38286f..0a56724d9f 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -44,6 +44,7 @@ const (
DefaultQueueVisibilityUpdateIntervalSeconds int32 = 5
DefaultClusterQueuesMaxCount int32 = 10
defaultJobFrameworkName = "batch/job"
+ DefaultMultiKueueGCTimeout = time.Minute
)
func getOperatorNamespace() string {
@@ -161,4 +162,11 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.Integrations.PodOptions.PodSelector == nil {
cfg.Integrations.PodOptions.PodSelector = &metav1.LabelSelector{}
}
+
+ if cfg.MultiKueue == nil {
+ cfg.MultiKueue = &MultiKueue{}
+ }
+ if cfg.MultiKueue.GCTimeout == nil {
+ cfg.MultiKueue.GCTimeout = &metav1.Duration{Duration: DefaultMultiKueueGCTimeout}
+ }
}
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index 6c38811f4f..a720b6091d 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -96,6 +96,8 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
}
+ defaultMultiKueue := &MultiKueue{GCTimeout: &metav1.Duration{Duration: DefaultMultiKueueGCTimeout}}
+
podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute}
@@ -118,6 +120,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"defaulting ControllerManager": {
@@ -158,6 +161,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"should not default ControllerManager": {
@@ -214,6 +218,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"should not set LeaderElectionID": {
@@ -254,6 +259,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"defaulting InternalCertManagement": {
@@ -271,6 +277,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"should not default InternalCertManagement": {
@@ -289,6 +296,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"should not default values in custom ClientConnection": {
@@ -314,6 +322,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"should default empty custom ClientConnection": {
@@ -333,6 +342,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"defaulting waitForPodsReady values": {
@@ -359,6 +369,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"set waitForPodsReady.blockAdmission to false when enable is false": {
@@ -385,6 +396,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"respecting provided waitForPodsReady values": {
@@ -413,6 +425,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"integrations": {
@@ -436,6 +449,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
PodOptions: defaultIntegrations.PodOptions,
},
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
},
"queue visibility": {
@@ -464,6 +478,26 @@ func TestSetDefaults_Configuration(t *testing.T) {
MaxCount: 0,
},
},
+ MultiKueue: defaultMultiKueue,
+ },
+ },
+ "multiKueue": {
+ original: &Configuration{
+ InternalCertManagement: &InternalCertManagement{
+ Enable: ptr.To(false),
+ },
+ MultiKueue: &MultiKueue{GCTimeout: &metav1.Duration{Duration: time.Second}},
+ },
+ want: &Configuration{
+ Namespace: ptr.To(DefaultNamespace),
+ ControllerManager: defaultCtrlManagerConfigurationSpec,
+ InternalCertManagement: &InternalCertManagement{
+ Enable: ptr.To(false),
+ },
+ ClientConnection: defaultClientConnection,
+ Integrations: defaultIntegrations,
+ QueueVisibility: defaultQueueVisibility,
+ MultiKueue: &MultiKueue{GCTimeout: &metav1.Duration{Duration: time.Second}},
},
},
}
diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go
index 72cfc95185..d52585577a 100644
--- a/apis/config/v1beta1/zz_generated.deepcopy.go
+++ b/apis/config/v1beta1/zz_generated.deepcopy.go
@@ -103,6 +103,11 @@ func (in *Configuration) DeepCopyInto(out *Configuration) {
*out = new(QueueVisibility)
(*in).DeepCopyInto(*out)
}
+ if in.MultiKueue != nil {
+ in, out := &in.MultiKueue, &out.MultiKueue
+ *out = new(MultiKueue)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Configuration.
@@ -283,6 +288,26 @@ func (in *InternalCertManagement) DeepCopy() *InternalCertManagement {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *MultiKueue) DeepCopyInto(out *MultiKueue) {
+ *out = *in
+ if in.GCTimeout != nil {
+ in, out := &in.GCTimeout, &out.GCTimeout
+ *out = new(v1.Duration)
+ **out = **in
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKueue.
+func (in *MultiKueue) DeepCopy() *MultiKueue {
+ if in == nil {
+ return nil
+ }
+ out := new(MultiKueue)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PodIntegrationOptions) DeepCopyInto(out *PodIntegrationOptions) {
*out = *in
diff --git a/apis/kueue/v1alpha1/multikueue_types.go b/apis/kueue/v1alpha1/multikueue_types.go
index e0357843d9..67bd990ebd 100644
--- a/apis/kueue/v1alpha1/multikueue_types.go
+++ b/apis/kueue/v1alpha1/multikueue_types.go
@@ -53,6 +53,14 @@ type KubeConfig struct {
type MultiKueueClusterSpec struct {
// Information how to connect to the cluster.
KubeConfig KubeConfig `json:"kubeConfig"`
+
+ // A label value used to track the creator of workloads in the worker cluster.
+ //
+ // +kubebuilder:validation:Optional
+ // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="Value is immutable"
+ // +kubebuilder:validation:MaxLength=63
+ // +kubebuilder:default=multikueue
+ Origin string `json:"origin,omitempty"`
}
type MultiKueueClusterStatus struct {
diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
index 6d8d1b3413..c6cb8ddacc 100644
--- a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
+++ b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
@@ -66,6 +66,15 @@ spec:
- location
- locationType
type: object
+ origin:
+ default: multikueue
+ description: A label value used to track the creator of workloads
+ in the worker cluster.
+ maxLength: 63
+ type: string
+ x-kubernetes-validations:
+ - message: Value is immutable
+ rule: self == oldSelf
required:
- kubeConfig
type: object
diff --git a/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go b/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go
index e7e304f7ea..d4c70f8e67 100644
--- a/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go
+++ b/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go
@@ -21,6 +21,7 @@ package v1alpha1
// with apply.
type MultiKueueClusterSpecApplyConfiguration struct {
KubeConfig *KubeConfigApplyConfiguration `json:"kubeConfig,omitempty"`
+ Origin *string `json:"origin,omitempty"`
}
// MultiKueueClusterSpecApplyConfiguration constructs an declarative configuration of the MultiKueueClusterSpec type for use with
@@ -36,3 +37,11 @@ func (b *MultiKueueClusterSpecApplyConfiguration) WithKubeConfig(value *KubeConf
b.KubeConfig = value
return b
}
+
+// WithOrigin sets the Origin field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the Origin field is set to the value of the last call.
+func (b *MultiKueueClusterSpecApplyConfiguration) WithOrigin(value string) *MultiKueueClusterSpecApplyConfiguration {
+ b.Origin = &value
+ return b
+}
diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go
index ef39b9f3e2..b2bfee611d 100644
--- a/cmd/kueue/main.go
+++ b/cmd/kueue/main.go
@@ -245,7 +245,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
}
if features.Enabled(features.MultiKueue) {
- if err := multikueue.SetupControllers(mgr, *cfg.Namespace); err != nil {
+ if err := multikueue.SetupControllers(mgr, *cfg.Namespace, multikueue.WithGCInterval(cfg.MultiKueue.GCTimeout.Duration)); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
os.Exit(1)
}
diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go
index 72da134fc2..7c883ccd2a 100644
--- a/cmd/kueue/main_test.go
+++ b/cmd/kueue/main_test.go
@@ -117,6 +117,7 @@ integrations:
MaxCount: config.DefaultClusterQueuesMaxCount,
},
},
+ MultiKueue: &config.MultiKueue{GCTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueGCTimeout}},
},
},
{
diff --git a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
index 837bfc144f..6e588d3dae 100644
--- a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
+++ b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
@@ -53,6 +53,15 @@ spec:
- location
- locationType
type: object
+ origin:
+ default: multikueue
+ description: A label value used to track the creator of workloads
+ in the worker cluster.
+ maxLength: 63
+ type: string
+ x-kubernetes-validations:
+ - message: Value is immutable
+ rule: self == oldSelf
required:
- kubeConfig
type: object
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 90ca95105d..461514cc75 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -258,6 +258,16 @@ integrations:
t.Fatal(err)
}
+ multiKueueConfig := filepath.Join(tmpDir, "multiKueue.yaml")
+ if err := os.WriteFile(multiKueueConfig, []byte(`
+apiVersion: config.kueue.x-k8s.io/v1beta1
+kind: Configuration
+namespace: kueue-system
+multiKueue:
+ gcTimeout: 1m30s
+`), os.FileMode(0600)); err != nil {
+ t.Fatal(err)
+ }
defaultControlOptions := ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
Metrics: metricsserver.Options{
@@ -323,6 +333,8 @@ integrations:
},
}
+ defaultMultiKueue := &configapi.MultiKueue{GCTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCTimeout}}
+
testcases := []struct {
name string
configFile string
@@ -339,6 +351,7 @@ integrations:
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
@@ -395,6 +408,7 @@ integrations:
},
},
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: defaultControlOptions,
},
@@ -412,6 +426,7 @@ integrations:
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
HealthProbeBindAddress: ":38081",
@@ -449,6 +464,7 @@ integrations:
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: defaultControlOptions,
},
@@ -468,6 +484,7 @@ integrations:
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: defaultControlOptions,
},
@@ -485,6 +502,7 @@ integrations:
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
@@ -525,6 +543,7 @@ integrations:
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
@@ -561,6 +580,7 @@ integrations:
},
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: defaultControlOptions,
},
@@ -581,6 +601,7 @@ integrations:
},
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
@@ -642,6 +663,7 @@ integrations:
},
},
QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
@@ -680,6 +702,7 @@ integrations:
MaxCount: 0,
},
},
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
@@ -737,6 +760,7 @@ integrations:
},
},
},
+ MultiKueue: defaultMultiKueue,
},
wantOptions: ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
@@ -756,6 +780,24 @@ integrations:
},
},
},
+ {
+ name: "multiKueue config",
+ configFile: multiKueueConfig,
+ wantConfiguration: configapi.Configuration{
+ TypeMeta: metav1.TypeMeta{
+ APIVersion: configapi.GroupVersion.String(),
+ Kind: "Configuration",
+ },
+ Namespace: ptr.To(configapi.DefaultNamespace),
+ ManageJobsWithoutQueueName: false,
+ InternalCertManagement: enableDefaultInternalCertManagement,
+ ClientConnection: defaultClientConnection,
+ Integrations: defaultIntegrations,
+ QueueVisibility: defaultQueueVisibility,
+ MultiKueue: &configapi.MultiKueue{GCTimeout: &metav1.Duration{Duration: 90 * time.Second}},
+ },
+ wantOptions: defaultControlOptions,
+ },
}
for _, tc := range testcases {
@@ -863,6 +905,9 @@ func TestEncode(t *testing.T) {
"updateIntervalSeconds": int64(configapi.DefaultQueueVisibilityUpdateIntervalSeconds),
"clusterQueues": map[string]any{"maxCount": int64(10)},
},
+ "multiKueue": map[string]any{
+ "gcTimeout": "1m0s",
+ },
},
},
}
diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go
index 11009d6311..0691c2cfb1 100644
--- a/pkg/controller/admissionchecks/multikueue/controllers.go
+++ b/pkg/controller/admissionchecks/multikueue/controllers.go
@@ -16,15 +16,45 @@ limitations under the License.
package multikueue
-import ctrl "sigs.k8s.io/controller-runtime"
+import (
+ "time"
+
+ ctrl "sigs.k8s.io/controller-runtime"
+)
+
+const (
+ defaultGCInterval = time.Minute
+)
+
+type SetupOptions struct {
+ gcInterval time.Duration
+}
+
+type SetupOption func(o *SetupOptions)
+
+// WithGCInterval - sets the interval between two garbage collection runs.
+// If 0 the garbage collection is disabled.
+func WithGCInterval(i time.Duration) SetupOption {
+ return func(o *SetupOptions) {
+ o.gcInterval = i
+ }
+}
+
+func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
+ options := &SetupOptions{
+ gcInterval: defaultGCInterval,
+ }
+
+ for _, o := range opts {
+ o(options)
+ }
-func SetupControllers(mgr ctrl.Manager, namespace string) error {
helper, err := newMultiKueueStoreHelper(mgr.GetClient())
if err != nil {
return err
}
- cRec := newClustersReconciler(mgr.GetClient(), namespace)
+ cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval)
err = cRec.setupWithManager(mgr)
if err != nil {
return err
diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
index 5d72f6e474..5ac3826e20 100644
--- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
+++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
@@ -233,13 +233,13 @@ func TestWlReconcileJobset(t *testing.T) {
managerClient := manageBuilder.Build()
- cRec := newClustersReconciler(managerClient, TestNamespace)
+ cRec := newClustersReconciler(managerClient, TestNamespace, 0)
worker1Builder, _ := getClientBuilder()
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &jobset.JobSetList{Items: tc.worker1JobSets})
worker1Client := worker1Builder.Build()
- w1remoteClient := newRemoteClient(managerClient, nil)
+ w1remoteClient := newRemoteClient(managerClient, nil, defaultOrigin)
w1remoteClient.client = worker1Client
cRec.remoteClients["worker1"] = w1remoteClient
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
index 08dfd05edd..f3c2a63358 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
@@ -23,12 +23,14 @@ import (
"os"
"strings"
"sync"
+ "time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/clientcmd"
@@ -45,6 +47,10 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)
+const (
+ defaultOrigin = "multikueue"
+)
+
type clientWithWatchBuilder func(config []byte, options client.Options) (client.WithWatch, error)
type remoteClient struct {
@@ -53,6 +59,7 @@ type remoteClient struct {
wlUpdateCh chan<- event.GenericEvent
watchCancel func()
kubeconfig []byte
+ origin string
// For unit testing only. There is now need of creating fully functional remote clients in the unit tests
// and creating valid kubeconfig content is not trivial.
@@ -60,10 +67,11 @@ type remoteClient struct {
builderOverride clientWithWatchBuilder
}
-func newRemoteClient(localClient client.Client, wlUpdateCh chan<- event.GenericEvent) *remoteClient {
+func newRemoteClient(localClient client.Client, wlUpdateCh chan<- event.GenericEvent, origin string) *remoteClient {
rc := &remoteClient{
wlUpdateCh: wlUpdateCh,
localClient: localClient,
+ origin: origin,
}
return rc
}
@@ -130,6 +138,44 @@ func (rc *remoteClient) queueWorkloadEvent(ctx context.Context, ev watch.Event)
}
}
+func (rc *remoteClient) runGC(ctx context.Context) {
+ log := ctrl.LoggerFrom(ctx)
+ lst := &kueue.WorkloadList{}
+ err := rc.client.List(ctx, lst, client.MatchingLabels{MultiKueueOriginLabelKey: rc.origin})
+ if err != nil {
+ log.V(2).Error(err, "Listing remote workloads")
+ return
+ }
+
+ for _, remoteWl := range lst.Items {
+ localWl := &kueue.Workload{}
+ err := rc.localClient.Get(ctx, client.ObjectKeyFromObject(&remoteWl), localWl)
+ if client.IgnoreNotFound(err) != nil {
+ log.V(2).Error(err, "Reading local workload")
+ continue
+ }
+
+ // if it's not found, or pending deletion
+ if err != nil || !localWl.DeletionTimestamp.IsZero() {
+ // if the remote wl has a controller, delete that as well
+ if controller := metav1.GetControllerOf(&remoteWl); controller != nil {
+ adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String()
+ if adapter, found := adapters[adapterKey]; !found {
+ log.V(2).Info("No adapter found", "adapterKey", adapterKey, "controller", controller.Name)
+ } else {
+ err := adapter.DeleteRemoteObject(ctx, rc.client, types.NamespacedName{Name: controller.Name, Namespace: remoteWl.Namespace})
+ if client.IgnoreNotFound(err) != nil {
+ log.V(2).Error(err, "Deleting remote workload's owner", "remoteWl", klog.KObj(&remoteWl))
+ }
+ }
+ if err := rc.client.Delete(ctx, &remoteWl); client.IgnoreNotFound(err) != nil {
+ log.V(2).Error(err, "Deleting remote workload", "remoteWl", klog.KObj(&remoteWl))
+ }
+ }
+ }
+ }
+}
+
// clustersReconciler implements the reconciler for all MultiKueueClusters.
// Its main task being to maintain the list of remote clients associated to each MultiKueueCluster.
type clustersReconciler struct {
@@ -141,6 +187,9 @@ type clustersReconciler struct {
remoteClients map[string]*remoteClient
wlUpdateCh chan event.GenericEvent
+ // gcInterval - time waiting between two GC runs.
+ gcInterval time.Duration
+
// rootContext - holds the context passed by the controller-runtime on Start.
// It's used to create child contexts for MultiKueueClusters client watch routines
// that will gracefully end when the controller-manager stops.
@@ -157,6 +206,7 @@ var _ reconcile.Reconciler = (*clustersReconciler)(nil)
func (c *clustersReconciler) Start(ctx context.Context) error {
c.rootContext = ctx
+ go c.runGC(ctx)
return nil
}
@@ -169,13 +219,13 @@ func (c *clustersReconciler) stopAndRemoveCluster(clusterName string) {
}
}
-func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, kubeconfig []byte) error {
+func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, kubeconfig []byte, origin string) error {
c.lock.Lock()
defer c.lock.Unlock()
client, found := c.remoteClients[clusterName]
if !found {
- client = newRemoteClient(c.localClient, c.wlUpdateCh)
+ client = newRemoteClient(c.localClient, c.wlUpdateCh, origin)
if c.builderOverride != nil {
client.builderOverride = c.builderOverride
}
@@ -225,7 +275,12 @@ func (c *clustersReconciler) Reconcile(ctx context.Context, req reconcile.Reques
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "BadConfig", err.Error())
}
- if err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig); err != nil {
+ origin := cluster.Spec.Origin
+ if len(origin) == 0 {
+ origin = defaultOrigin
+ }
+
+ if err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig, origin); err != nil {
log.Error(err, "setting kubeconfig")
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "ClientConnectionFailed", err.Error())
}
@@ -285,16 +340,38 @@ func (c *clustersReconciler) updateStatus(ctx context.Context, cluster *kueuealp
return c.localClient.Status().Update(ctx, cluster)
}
+func (c *clustersReconciler) runGC(ctx context.Context) {
+ log := ctrl.LoggerFrom(ctx)
+ if c.gcInterval == 0 {
+ log.V(2).Info("Garbage Collection is disabled")
+ return
+ }
+ log.V(2).Info("Starting Garbage Collector")
+ for {
+ select {
+ case <-ctx.Done():
+ log.V(2).Info("Garbage Collector Stopped")
+ return
+ case <-time.After(c.gcInterval):
+ log.V(5).Info("Run Garbage Collection")
+ for clusterName, rc := range c.clients {
+ rc.runGC(ctrl.LoggerInto(ctx, log.WithValues("multiKueueCluster", clusterName)))
+ }
+ }
+ }
+}
+
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters,verbs=get;list;watch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters/status,verbs=get;update;patch
-func newClustersReconciler(c client.Client, namespace string) *clustersReconciler {
+func newClustersReconciler(c client.Client, namespace string, gcInterval time.Duration) *clustersReconciler {
return &clustersReconciler{
localClient: c,
configNamespace: namespace,
remoteClients: make(map[string]*remoteClient),
wlUpdateCh: make(chan event.GenericEvent, 10),
+ gcInterval: gcInterval,
}
}
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
index 8d2dea6f00..59f4eb94c7 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
@@ -22,6 +22,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
+ batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -29,8 +30,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
+ kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/slices"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
+ testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
)
var (
@@ -191,7 +194,7 @@ func TestUpdateConfig(t *testing.T) {
builder = builder.WithStatusSubresource(slices.Map(tc.clusters, func(c *kueuealpha.MultiKueueCluster) client.Object { return c })...)
c := builder.Build()
- reconciler := newClustersReconciler(c, TestNamespace)
+ reconciler := newClustersReconciler(c, TestNamespace, 0)
if len(tc.remoteClients) > 0 {
reconciler.remoteClients = tc.remoteClients
@@ -222,3 +225,157 @@ func TestUpdateConfig(t *testing.T) {
})
}
}
+
+func TestRemoteClientGC(t *testing.T) {
+ baseJobBuilder := testingjob.MakeJob("job1", TestNamespace)
+ baseWlBuilder := utiltesting.MakeWorkload("wl1", TestNamespace).OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "test-uuid", true, true)
+
+ cases := map[string]struct {
+ managersWorkloads []kueue.Workload
+ workersWorkloads []kueue.Workload
+ managersJobs []batchv1.Job
+ workersJobs []batchv1.Job
+
+ wantManagersWorkloads []kueue.Workload
+ wantWorkersWorkloads []kueue.Workload
+ wantManagersJobs []batchv1.Job
+ wantWorkersJobs []batchv1.Job
+ }{
+ "existing workers and jobs are not deleted": {
+ managersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Obj(),
+ },
+ workersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Obj(),
+ },
+ managersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ workersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ wantManagersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Obj(),
+ },
+ wantWorkersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Obj(),
+ },
+ wantManagersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ wantWorkersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ },
+ "missing workers and their owner jobs are deleted": {
+ workersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Obj(),
+ },
+ managersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ workersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ wantManagersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ },
+ "unrelated workers and jobs are not deleted": {
+ workersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, "other-gc-key").
+ Obj(),
+ },
+ workersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ wantWorkersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, "other-gc-key").
+ Obj(),
+ },
+ wantWorkersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ },
+ }
+
+ objCheckOpts := []cmp.Option{
+ cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"),
+ cmpopts.EquateEmpty(),
+ }
+
+ for name, tc := range cases {
+ t.Run(name, func(t *testing.T) {
+ manageBuilder, ctx := getClientBuilder()
+ manageBuilder = manageBuilder.WithLists(&kueue.WorkloadList{Items: tc.managersWorkloads}, &batchv1.JobList{Items: tc.managersJobs})
+ managerClient := manageBuilder.Build()
+
+ worker1Builder, _ := getClientBuilder()
+ worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.workersWorkloads}, &batchv1.JobList{Items: tc.workersJobs})
+ worker1Client := worker1Builder.Build()
+
+ w1remoteClient := newRemoteClient(managerClient, nil, defaultOrigin)
+ w1remoteClient.client = worker1Client
+
+ w1remoteClient.runGC(ctx)
+
+ gotManagersWokloads := &kueue.WorkloadList{}
+ err := managerClient.List(ctx, gotManagersWokloads)
+ if err != nil {
+ t.Error("unexpected list manager's workloads error")
+ }
+
+ if diff := cmp.Diff(tc.wantManagersWorkloads, gotManagersWokloads.Items, objCheckOpts...); diff != "" {
+ t.Errorf("unexpected manager's workloads (-want/+got):\n%s", diff)
+ }
+
+ gotWorker1Wokloads := &kueue.WorkloadList{}
+ err = worker1Client.List(ctx, gotWorker1Wokloads)
+ if err != nil {
+ t.Error("unexpected list worker's workloads error")
+ }
+
+ if diff := cmp.Diff(tc.wantWorkersWorkloads, gotWorker1Wokloads.Items, objCheckOpts...); diff != "" {
+ t.Errorf("unexpected worker's workloads (-want/+got):\n%s", diff)
+ }
+ gotManagersJobs := &batchv1.JobList{}
+ err = managerClient.List(ctx, gotManagersJobs)
+ if err != nil {
+ t.Error("unexpected list manager's jobs error")
+ }
+
+ if diff := cmp.Diff(tc.wantManagersJobs, gotManagersJobs.Items, objCheckOpts...); diff != "" {
+ t.Errorf("unexpected manager's jobs (-want/+got):\n%s", diff)
+ }
+
+ gotWorker1Job := &batchv1.JobList{}
+ err = worker1Client.List(ctx, gotWorker1Job)
+ if err != nil {
+ t.Error("unexpected list worker's jobs error")
+ }
+
+ if diff := cmp.Diff(tc.wantWorkersJobs, gotWorker1Job.Items, objCheckOpts...); diff != "" {
+ t.Errorf("unexpected worker's jobs (-want/+got):\n%s", diff)
+ }
+ })
+ }
+}
diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go
index 0d10fbf44b..e805a821da 100644
--- a/pkg/controller/admissionchecks/multikueue/workload.go
+++ b/pkg/controller/admissionchecks/multikueue/workload.go
@@ -52,6 +52,10 @@ var (
errNoActiveClusters = errors.New("no active clusters")
)
+const (
+ MultiKueueOriginLabelKey = "kueue.x-k8s.io/multikueue-origin"
+)
+
type wlReconciler struct {
client client.Client
helper *multiKueueStoreHelper
@@ -329,7 +333,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error
// finally - create missing workloads
for rem, remWl := range group.remotes {
if remWl == nil {
- clone := cloneForCreate(group.local)
+ clone := cloneForCreate(group.local, group.remoteClients[rem].origin)
err := group.remoteClients[rem].client.Create(ctx, clone)
if err != nil {
// just log the error for a single remote
@@ -375,9 +379,13 @@ func cleanObjectMeta(orig *metav1.ObjectMeta) metav1.ObjectMeta {
}
}
-func cloneForCreate(orig *kueue.Workload) *kueue.Workload {
+func cloneForCreate(orig *kueue.Workload, origin string) *kueue.Workload {
remoteWl := &kueue.Workload{}
remoteWl.ObjectMeta = cleanObjectMeta(&orig.ObjectMeta)
+ if remoteWl.Labels == nil {
+ remoteWl.Labels = make(map[string]string)
+ }
+ remoteWl.Labels[MultiKueueOriginLabelKey] = origin
orig.Spec.DeepCopyInto(&remoteWl.Spec)
return remoteWl
}
diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go
index 70744d1a86..4993a16a80 100644
--- a/pkg/controller/admissionchecks/multikueue/workload_test.go
+++ b/pkg/controller/admissionchecks/multikueue/workload_test.go
@@ -95,7 +95,9 @@ func TestWlReconcile(t *testing.T) {
Obj(),
},
worker1Workloads: []kueue.Workload{
- *baseWorkloadBuilder.Clone().Obj(),
+ *baseWorkloadBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
@@ -121,7 +123,9 @@ func TestWlReconcile(t *testing.T) {
Obj(),
},
wantWorker1Workloads: []kueue.Workload{
- *baseWorkloadBuilder.Clone().Obj(),
+ *baseWorkloadBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Obj(),
},
},
"remote wl with reservation": {
@@ -141,6 +145,7 @@ func TestWlReconcile(t *testing.T) {
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
@@ -160,6 +165,7 @@ func TestWlReconcile(t *testing.T) {
wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
@@ -196,6 +202,7 @@ func TestWlReconcile(t *testing.T) {
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: "by test"}).
Obj(),
@@ -220,6 +227,7 @@ func TestWlReconcile(t *testing.T) {
wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: "by test"}).
Obj(),
@@ -261,6 +269,7 @@ func TestWlReconcile(t *testing.T) {
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
+ Label(MultiKueueOriginLabelKey, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: "by test"}).
Obj(),
@@ -301,13 +310,13 @@ func TestWlReconcile(t *testing.T) {
managerClient := manageBuilder.Build()
- cRec := newClustersReconciler(managerClient, TestNamespace)
+ cRec := newClustersReconciler(managerClient, TestNamespace, 0)
worker1Builder, _ := getClientBuilder()
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &batchv1.JobList{Items: tc.worker1Jobs})
worker1Client := worker1Builder.Build()
- w1remoteClient := newRemoteClient(managerClient, nil)
+ w1remoteClient := newRemoteClient(managerClient, nil, defaultOrigin)
w1remoteClient.client = worker1Client
cRec.remoteClients["worker1"] = w1remoteClient
@@ -322,40 +331,40 @@ func TestWlReconcile(t *testing.T) {
gotManagersWokloads := &kueue.WorkloadList{}
err := managerClient.List(ctx, gotManagersWokloads)
if err != nil {
- t.Error("unexpected list managers workloads error")
+ t.Error("unexpected list manager's workloads error")
}
if diff := cmp.Diff(tc.wantManagersWorkloads, gotManagersWokloads.Items, objCheckOpts...); diff != "" {
- t.Errorf("unexpected manangers workloads (-want/+got):\n%s", diff)
+ t.Errorf("unexpected manager's workloads (-want/+got):\n%s", diff)
}
gotWorker1Wokloads := &kueue.WorkloadList{}
err = worker1Client.List(ctx, gotWorker1Wokloads)
if err != nil {
- t.Error("unexpected list managers workloads error")
+ t.Error("unexpected list worker's workloads error")
}
if diff := cmp.Diff(tc.wantWorker1Workloads, gotWorker1Wokloads.Items, objCheckOpts...); diff != "" {
- t.Errorf("unexpected manangers workloads (-want/+got):\n%s", diff)
+ t.Errorf("unexpected worker's workloads (-want/+got):\n%s", diff)
}
gotManagersJobs := &batchv1.JobList{}
err = managerClient.List(ctx, gotManagersJobs)
if err != nil {
- t.Error("unexpected list managers jobs error")
+ t.Error("unexpected list manager's jobs error")
}
if diff := cmp.Diff(tc.wantManagersJobs, gotManagersJobs.Items, objCheckOpts...); diff != "" {
- t.Errorf("unexpected manangers jobs (-want/+got):\n%s", diff)
+ t.Errorf("unexpected manager's jobs (-want/+got):\n%s", diff)
}
gotWorker1Job := &batchv1.JobList{}
err = worker1Client.List(ctx, gotWorker1Job)
if err != nil {
- t.Error("unexpected list managers jobs error")
+ t.Error("unexpected list worker's jobs error")
}
if diff := cmp.Diff(tc.wantWorker1Jobs, gotWorker1Job.Items, objCheckOpts...); diff != "" {
- t.Errorf("unexpected worker1 jobs (-want/+got):\n%s", diff)
+ t.Errorf("unexpected worker's jobs (-want/+got):\n%s", diff)
}
})
}
diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go
index 35aa62b503..f7a9be5213 100644
--- a/pkg/util/testing/wrappers.go
+++ b/pkg/util/testing/wrappers.go
@@ -213,6 +213,14 @@ func (w *WorkloadWrapper) Labels(l map[string]string) *WorkloadWrapper {
return w
}
+func (w *WorkloadWrapper) Label(k, v string) *WorkloadWrapper {
+ if w.ObjectMeta.Labels == nil {
+ w.ObjectMeta.Labels = make(map[string]string)
+ }
+ w.ObjectMeta.Labels[k] = v
+ return w
+}
+
func (w *WorkloadWrapper) AdmissionChecks(checks ...kueue.AdmissionCheckState) *WorkloadWrapper {
w.Status.AdmissionChecks = checks
return w
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index 772b30bb67..06f49ab03f 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -156,6 +156,13 @@ integrations (including K8S job).
pending workloads.
+multiKueue [Required]
+MultiKueue
+ |
+
+ MultiKueue controls the behaviour of the MultiKueue AdmissionCheck Controller.
+ |
+
@@ -473,6 +480,30 @@ Defaults to kueue-webhook-server-cert.
+## `MultiKueue` {#MultiKueue}
+
+
+**Appears in:**
+
+
+
+
+
+Field | Description |
+
+
+
+gcTimeout
+k8s.io/apimachinery/pkg/apis/meta/v1.Duration
+ |
+
+ GCTimeout defines the timeout between two consecutive garbage collection runs.
+Defaults to 1min. If 0, the garbage collection is disabled.
+ |
+
+
+
+
## `PodIntegrationOptions` {#PodIntegrationOptions}
diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go
index 0d5a015d31..ee5f2ecb2d 100644
--- a/test/integration/multikueue/multikueue_test.go
+++ b/test/integration/multikueue/multikueue_test.go
@@ -488,4 +488,62 @@ var _ = ginkgo.Describe("Multikueue", func() {
})
})
+ ginkgo.It("Should remove the worker's workload and job when managers job is deleted", func() {
+ job := testingjob.MakeJob("job", managerNs.Name).
+ Queue(managerLq.Name).
+ Obj()
+ gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, job)).Should(gomega.Succeed())
+
+ createdWorkload := &kueue.Workload{}
+ wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: managerNs.Name}
+
+ ginkgo.By("setting workload reservation in the management cluster", func() {
+ admission := utiltesting.MakeAdmission(managerCq.Name).Obj()
+ gomega.Eventually(func(g gomega.Gomega) {
+ g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
+ g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
+ }, util.Timeout, util.Interval).Should(gomega.Succeed())
+ })
+
+ ginkgo.By("checking the workload creation in the worker clusters", func() {
+ managerWl := &kueue.Workload{}
+ gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
+ gomega.Eventually(func(g gomega.Gomega) {
+ g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
+ g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
+ g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
+ g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
+ }, util.Timeout, util.Interval).Should(gomega.Succeed())
+ })
+
+ ginkgo.By("setting workload reservation in worker1, the job is created in worker1", func() {
+ admission := utiltesting.MakeAdmission(managerCq.Name).Obj()
+
+ gomega.Eventually(func(g gomega.Gomega) {
+ g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
+ g.Expect(util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
+ }, util.Timeout, util.Interval).Should(gomega.Succeed())
+
+ gomega.Eventually(func(g gomega.Gomega) {
+ createdJob := batchv1.Job{}
+ g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
+ }, util.Timeout, util.Interval).Should(gomega.Succeed())
+ })
+
+ ginkgo.By("removing the managers job and workload, the workload and job in worker1 are removed", func() {
+ gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, job)).Should(gomega.Succeed())
+ gomega.Eventually(func(g gomega.Gomega) {
+ g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
+ g.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, createdWorkload)).To(gomega.Succeed())
+
+ }, util.LongTimeout, util.Interval).Should(gomega.Succeed())
+
+ gomega.Eventually(func(g gomega.Gomega) {
+ createdJob := batchv1.Job{}
+ g.Expect(worker1TestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
+ g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
+ }, util.LongTimeout, util.Interval).Should(gomega.Succeed())
+ })
+ })
+
})
diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go
index 383b56beda..ed145ed1fd 100644
--- a/test/integration/multikueue/suite_test.go
+++ b/test/integration/multikueue/suite_test.go
@@ -20,6 +20,7 @@ import (
"context"
"path/filepath"
"testing"
+ "time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
@@ -172,6 +173,6 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) {
err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name)
+ err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, multikueue.WithGCInterval(2*time.Second))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
From f035491a337eac8321b8087bbc30d3c386153de9 Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Mon, 29 Jan 2024 10:51:43 +0200
Subject: [PATCH 02/12] Fix after rebase
---
pkg/controller/admissionchecks/multikueue/multikueuecluster.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
index f3c2a63358..616796320a 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
@@ -354,7 +354,7 @@ func (c *clustersReconciler) runGC(ctx context.Context) {
return
case <-time.After(c.gcInterval):
log.V(5).Info("Run Garbage Collection")
- for clusterName, rc := range c.clients {
+ for clusterName, rc := range c.remoteClients {
rc.runGC(ctrl.LoggerInto(ctx, log.WithValues("multiKueueCluster", clusterName)))
}
}
From 12bbde9488f03b88646ee1ce4cf74a628d1568c2 Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Wed, 31 Jan 2024 13:41:00 +0200
Subject: [PATCH 03/12] Review Remarks
---
.../multikueue/multikueuecluster.go | 39 +++++++++++--------
1 file changed, 23 insertions(+), 16 deletions(-)
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
index 616796320a..ebe383e8f4 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
@@ -138,6 +138,9 @@ func (rc *remoteClient) queueWorkloadEvent(ctx context.Context, ev watch.Event)
}
}
+// runGc - lists all the remote workloads having the same multikueue-origin and remove those who
+// no longer have a local corespondent (missing or awaiting deletion). If the remote workload is
+// is owned by a job,also delete the job.
func (rc *remoteClient) runGC(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)
lst := &kueue.WorkloadList{}
@@ -149,30 +152,34 @@ func (rc *remoteClient) runGC(ctx context.Context) {
for _, remoteWl := range lst.Items {
localWl := &kueue.Workload{}
+ wlLog := log.WithValues("remoteWl", klog.KObj(&remoteWl))
err := rc.localClient.Get(ctx, client.ObjectKeyFromObject(&remoteWl), localWl)
if client.IgnoreNotFound(err) != nil {
- log.V(2).Error(err, "Reading local workload")
+ wlLog.V(2).Error(err, "Reading local workload")
continue
}
- // if it's not found, or pending deletion
- if err != nil || !localWl.DeletionTimestamp.IsZero() {
- // if the remote wl has a controller, delete that as well
- if controller := metav1.GetControllerOf(&remoteWl); controller != nil {
- adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String()
- if adapter, found := adapters[adapterKey]; !found {
- log.V(2).Info("No adapter found", "adapterKey", adapterKey, "controller", controller.Name)
- } else {
- err := adapter.DeleteRemoteObject(ctx, rc.client, types.NamespacedName{Name: controller.Name, Namespace: remoteWl.Namespace})
- if client.IgnoreNotFound(err) != nil {
- log.V(2).Error(err, "Deleting remote workload's owner", "remoteWl", klog.KObj(&remoteWl))
- }
- }
- if err := rc.client.Delete(ctx, &remoteWl); client.IgnoreNotFound(err) != nil {
- log.V(2).Error(err, "Deleting remote workload", "remoteWl", klog.KObj(&remoteWl))
+ if err == nil && localWl.DeletionTimestamp.IsZero() {
+ // The remote workload is still relevant.
+ continue
+ }
+
+ // if the remote wl has a controller(owning Job), delete the job
+ if controller := metav1.GetControllerOf(&remoteWl); controller != nil {
+ ownerKey := types.NamespacedName{Name: controller.Name, Namespace: remoteWl.Namespace}
+ adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String()
+ if adapter, found := adapters[adapterKey]; !found {
+ wlLog.V(2).Info("No adapter found", "adapterKey", adapterKey, "ownerKey", ownerKey)
+ } else {
+ err := adapter.DeleteRemoteObject(ctx, rc.client, ownerKey)
+ if client.IgnoreNotFound(err) != nil {
+ wlLog.V(2).Error(err, "Deleting remote workload's owner", "ownerKey", ownerKey)
}
}
}
+ if err := rc.client.Delete(ctx, &remoteWl); client.IgnoreNotFound(err) != nil {
+ wlLog.V(2).Error(err, "Deleting remote workload")
+ }
}
}
From deb9bebfe878b223488074dd2cd1ee4377feb1f7 Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Thu, 1 Feb 2024 09:06:40 +0200
Subject: [PATCH 04/12] Extend API description.
---
apis/kueue/v1alpha1/multikueue_types.go | 2 ++
.../templates/crd/kueue.x-k8s.io_multikueueclusters.yaml | 4 +++-
.../crd/bases/kueue.x-k8s.io_multikueueclusters.yaml | 4 +++-
3 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/apis/kueue/v1alpha1/multikueue_types.go b/apis/kueue/v1alpha1/multikueue_types.go
index 67bd990ebd..af6eebca7b 100644
--- a/apis/kueue/v1alpha1/multikueue_types.go
+++ b/apis/kueue/v1alpha1/multikueue_types.go
@@ -55,6 +55,8 @@ type MultiKueueClusterSpec struct {
KubeConfig KubeConfig `json:"kubeConfig"`
// A label value used to track the creator of workloads in the worker cluster.
+ // This is used by the garbage collector to identify remote objects that ware created via
+ // this cluster and delete them if their local counterpart no longer exists.
//
// +kubebuilder:validation:Optional
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="Value is immutable"
diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
index c6cb8ddacc..4a172cacda 100644
--- a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
+++ b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
@@ -69,7 +69,9 @@ spec:
origin:
default: multikueue
description: A label value used to track the creator of workloads
- in the worker cluster.
+ in the worker cluster. This is used by the garbage collector to
+ identify remote objects that ware created via this cluster and delete
+ them if their local counterpart no longer exists.
maxLength: 63
type: string
x-kubernetes-validations:
diff --git a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
index 6e588d3dae..fb704666dc 100644
--- a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
+++ b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
@@ -56,7 +56,9 @@ spec:
origin:
default: multikueue
description: A label value used to track the creator of workloads
- in the worker cluster.
+ in the worker cluster. This is used by the garbage collector to
+ identify remote objects that ware created via this cluster and delete
+ them if their local counterpart no longer exists.
maxLength: 63
type: string
x-kubernetes-validations:
From 73131ed378ff45e602003e9fab57eff0ec525b4e Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Fri, 2 Feb 2024 09:20:28 +0200
Subject: [PATCH 05/12] Make Origin a global config.
---
apis/config/v1beta1/configuration_types.go | 7 +++++++
apis/config/v1beta1/defaults.go | 4 ++++
apis/config/v1beta1/defaults_test.go | 15 ++++++++++++---
apis/kueue/v1alpha1/multikueue_types.go | 10 ----------
.../crd/kueue.x-k8s.io_multikueueclusters.yaml | 11 -----------
.../kueue/v1alpha1/multikueueclusterspec.go | 9 ---------
cmd/kueue/main.go | 5 ++++-
cmd/kueue/main_test.go | 5 ++++-
.../kueue.x-k8s.io_multikueueclusters.yaml | 11 -----------
pkg/config/config_test.go | 12 ++++++++++--
.../admissionchecks/multikueue/controllers.go | 12 +++++++++++-
.../multikueue/jobset_adapter_test.go | 2 +-
.../multikueue/multikueuecluster.go | 17 ++++++-----------
.../multikueue/multikueuecluster_test.go | 2 +-
.../admissionchecks/multikueue/workload_test.go | 2 +-
.../en/docs/reference/kueue-config.v1beta1.md | 10 ++++++++++
16 files changed, 71 insertions(+), 63 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index 6f34643969..7efeb08c6f 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -207,6 +207,13 @@ type MultiKueue struct {
// Defaults to 1min. If 0, the garbage collection is disabled.
// +optional
GCTimeout *metav1.Duration `json:"gcTimeout,omitempty"`
+
+ // A label value used to track the creator of workloads in the worker clusters.
+ // This is used by multikueue in components like its garbage collector to identify
+ // remote objects that ware created by this multikueue manager cluster and delete
+ // them if their local counterpart no longer exists.
+ // +optional
+ Origin string `json:"origin,omitempty"`
}
type RequeuingTimestamp string
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index 0a56724d9f..f2d01dbd6b 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -45,6 +45,7 @@ const (
DefaultClusterQueuesMaxCount int32 = 10
defaultJobFrameworkName = "batch/job"
DefaultMultiKueueGCTimeout = time.Minute
+ DefaultMultiKueueOrigin = "multikueue"
)
func getOperatorNamespace() string {
@@ -169,4 +170,7 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.MultiKueue.GCTimeout == nil {
cfg.MultiKueue.GCTimeout = &metav1.Duration{Duration: DefaultMultiKueueGCTimeout}
}
+ if cfg.MultiKueue.Origin == "" {
+ cfg.MultiKueue.Origin = DefaultMultiKueueOrigin
+ }
}
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index a720b6091d..ad253ec55c 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -96,7 +96,10 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
}
- defaultMultiKueue := &MultiKueue{GCTimeout: &metav1.Duration{Duration: DefaultMultiKueueGCTimeout}}
+ defaultMultiKueue := &MultiKueue{
+ GCTimeout: &metav1.Duration{Duration: DefaultMultiKueueGCTimeout},
+ Origin: DefaultMultiKueueOrigin,
+ }
podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute}
@@ -486,7 +489,10 @@ func TestSetDefaults_Configuration(t *testing.T) {
InternalCertManagement: &InternalCertManagement{
Enable: ptr.To(false),
},
- MultiKueue: &MultiKueue{GCTimeout: &metav1.Duration{Duration: time.Second}},
+ MultiKueue: &MultiKueue{
+ GCTimeout: &metav1.Duration{Duration: time.Second},
+ Origin: "multikueue-manager1",
+ },
},
want: &Configuration{
Namespace: ptr.To(DefaultNamespace),
@@ -497,7 +503,10 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
- MultiKueue: &MultiKueue{GCTimeout: &metav1.Duration{Duration: time.Second}},
+ MultiKueue: &MultiKueue{
+ GCTimeout: &metav1.Duration{Duration: time.Second},
+ Origin: "multikueue-manager1",
+ },
},
},
}
diff --git a/apis/kueue/v1alpha1/multikueue_types.go b/apis/kueue/v1alpha1/multikueue_types.go
index af6eebca7b..e0357843d9 100644
--- a/apis/kueue/v1alpha1/multikueue_types.go
+++ b/apis/kueue/v1alpha1/multikueue_types.go
@@ -53,16 +53,6 @@ type KubeConfig struct {
type MultiKueueClusterSpec struct {
// Information how to connect to the cluster.
KubeConfig KubeConfig `json:"kubeConfig"`
-
- // A label value used to track the creator of workloads in the worker cluster.
- // This is used by the garbage collector to identify remote objects that ware created via
- // this cluster and delete them if their local counterpart no longer exists.
- //
- // +kubebuilder:validation:Optional
- // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="Value is immutable"
- // +kubebuilder:validation:MaxLength=63
- // +kubebuilder:default=multikueue
- Origin string `json:"origin,omitempty"`
}
type MultiKueueClusterStatus struct {
diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
index 4a172cacda..6d8d1b3413 100644
--- a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
+++ b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
@@ -66,17 +66,6 @@ spec:
- location
- locationType
type: object
- origin:
- default: multikueue
- description: A label value used to track the creator of workloads
- in the worker cluster. This is used by the garbage collector to
- identify remote objects that ware created via this cluster and delete
- them if their local counterpart no longer exists.
- maxLength: 63
- type: string
- x-kubernetes-validations:
- - message: Value is immutable
- rule: self == oldSelf
required:
- kubeConfig
type: object
diff --git a/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go b/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go
index d4c70f8e67..e7e304f7ea 100644
--- a/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go
+++ b/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go
@@ -21,7 +21,6 @@ package v1alpha1
// with apply.
type MultiKueueClusterSpecApplyConfiguration struct {
KubeConfig *KubeConfigApplyConfiguration `json:"kubeConfig,omitempty"`
- Origin *string `json:"origin,omitempty"`
}
// MultiKueueClusterSpecApplyConfiguration constructs an declarative configuration of the MultiKueueClusterSpec type for use with
@@ -37,11 +36,3 @@ func (b *MultiKueueClusterSpecApplyConfiguration) WithKubeConfig(value *KubeConf
b.KubeConfig = value
return b
}
-
-// WithOrigin sets the Origin field in the declarative configuration to the given value
-// and returns the receiver, so that objects can be built by chaining "With" function invocations.
-// If called multiple times, the Origin field is set to the value of the last call.
-func (b *MultiKueueClusterSpecApplyConfiguration) WithOrigin(value string) *MultiKueueClusterSpecApplyConfiguration {
- b.Origin = &value
- return b
-}
diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go
index b2bfee611d..adf25f1b1b 100644
--- a/cmd/kueue/main.go
+++ b/cmd/kueue/main.go
@@ -245,7 +245,10 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
}
if features.Enabled(features.MultiKueue) {
- if err := multikueue.SetupControllers(mgr, *cfg.Namespace, multikueue.WithGCInterval(cfg.MultiKueue.GCTimeout.Duration)); err != nil {
+ if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
+ multikueue.WithGCInterval(cfg.MultiKueue.GCTimeout.Duration),
+ multikueue.WithOrigin(cfg.MultiKueue.Origin),
+ ); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
os.Exit(1)
}
diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go
index 7c883ccd2a..23c659c02f 100644
--- a/cmd/kueue/main_test.go
+++ b/cmd/kueue/main_test.go
@@ -117,7 +117,10 @@ integrations:
MaxCount: config.DefaultClusterQueuesMaxCount,
},
},
- MultiKueue: &config.MultiKueue{GCTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueGCTimeout}},
+ MultiKueue: &config.MultiKueue{
+ GCTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueGCTimeout},
+ Origin: config.DefaultMultiKueueOrigin,
+ },
},
},
{
diff --git a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
index fb704666dc..837bfc144f 100644
--- a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
+++ b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
@@ -53,17 +53,6 @@ spec:
- location
- locationType
type: object
- origin:
- default: multikueue
- description: A label value used to track the creator of workloads
- in the worker cluster. This is used by the garbage collector to
- identify remote objects that ware created via this cluster and delete
- them if their local counterpart no longer exists.
- maxLength: 63
- type: string
- x-kubernetes-validations:
- - message: Value is immutable
- rule: self == oldSelf
required:
- kubeConfig
type: object
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 461514cc75..85e5c36ba6 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -265,6 +265,7 @@ kind: Configuration
namespace: kueue-system
multiKueue:
gcTimeout: 1m30s
+ origin: multikueue-manager1
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
@@ -333,7 +334,10 @@ multiKueue:
},
}
- defaultMultiKueue := &configapi.MultiKueue{GCTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCTimeout}}
+ defaultMultiKueue := &configapi.MultiKueue{
+ GCTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCTimeout},
+ Origin: configapi.DefaultMultiKueueOrigin,
+ }
testcases := []struct {
name string
@@ -794,7 +798,10 @@ multiKueue:
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
- MultiKueue: &configapi.MultiKueue{GCTimeout: &metav1.Duration{Duration: 90 * time.Second}},
+ MultiKueue: &configapi.MultiKueue{
+ GCTimeout: &metav1.Duration{Duration: 90 * time.Second},
+ Origin: "multikueue-manager1",
+ },
},
wantOptions: defaultControlOptions,
},
@@ -907,6 +914,7 @@ func TestEncode(t *testing.T) {
},
"multiKueue": map[string]any{
"gcTimeout": "1m0s",
+ "origin": "multikueue",
},
},
},
diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go
index 0691c2cfb1..0861327dd6 100644
--- a/pkg/controller/admissionchecks/multikueue/controllers.go
+++ b/pkg/controller/admissionchecks/multikueue/controllers.go
@@ -24,10 +24,12 @@ import (
const (
defaultGCInterval = time.Minute
+ defaultOrigin = "multikueue"
)
type SetupOptions struct {
gcInterval time.Duration
+ origin string
}
type SetupOption func(o *SetupOptions)
@@ -40,9 +42,17 @@ func WithGCInterval(i time.Duration) SetupOption {
}
}
+// WithOrigin - sets the multikueue-origin label value used by this manager
+func WithOrigin(origin string) SetupOption {
+ return func(o *SetupOptions) {
+ o.origin = origin
+ }
+}
+
func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := &SetupOptions{
gcInterval: defaultGCInterval,
+ origin: defaultOrigin,
}
for _, o := range opts {
@@ -54,7 +64,7 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}
- cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval)
+ cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin)
err = cRec.setupWithManager(mgr)
if err != nil {
return err
diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
index 5ac3826e20..4bbf164dcb 100644
--- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
+++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
@@ -233,7 +233,7 @@ func TestWlReconcileJobset(t *testing.T) {
managerClient := manageBuilder.Build()
- cRec := newClustersReconciler(managerClient, TestNamespace, 0)
+ cRec := newClustersReconciler(managerClient, TestNamespace, 0, defaultOrigin)
worker1Builder, _ := getClientBuilder()
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &jobset.JobSetList{Items: tc.worker1JobSets})
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
index ebe383e8f4..424dbce97b 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
@@ -47,10 +47,6 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)
-const (
- defaultOrigin = "multikueue"
-)
-
type clientWithWatchBuilder func(config []byte, options client.Options) (client.WithWatch, error)
type remoteClient struct {
@@ -197,6 +193,9 @@ type clustersReconciler struct {
// gcInterval - time waiting between two GC runs.
gcInterval time.Duration
+ // the multikueue-origin value used
+ origin string
+
// rootContext - holds the context passed by the controller-runtime on Start.
// It's used to create child contexts for MultiKueueClusters client watch routines
// that will gracefully end when the controller-manager stops.
@@ -282,12 +281,7 @@ func (c *clustersReconciler) Reconcile(ctx context.Context, req reconcile.Reques
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "BadConfig", err.Error())
}
- origin := cluster.Spec.Origin
- if len(origin) == 0 {
- origin = defaultOrigin
- }
-
- if err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig, origin); err != nil {
+ if err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig, c.origin); err != nil {
log.Error(err, "setting kubeconfig")
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "ClientConnectionFailed", err.Error())
}
@@ -372,13 +366,14 @@ func (c *clustersReconciler) runGC(ctx context.Context) {
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters,verbs=get;list;watch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters/status,verbs=get;update;patch
-func newClustersReconciler(c client.Client, namespace string, gcInterval time.Duration) *clustersReconciler {
+func newClustersReconciler(c client.Client, namespace string, gcInterval time.Duration, origin string) *clustersReconciler {
return &clustersReconciler{
localClient: c,
configNamespace: namespace,
remoteClients: make(map[string]*remoteClient),
wlUpdateCh: make(chan event.GenericEvent, 10),
gcInterval: gcInterval,
+ origin: origin,
}
}
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
index 59f4eb94c7..1b6bb17405 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
@@ -194,7 +194,7 @@ func TestUpdateConfig(t *testing.T) {
builder = builder.WithStatusSubresource(slices.Map(tc.clusters, func(c *kueuealpha.MultiKueueCluster) client.Object { return c })...)
c := builder.Build()
- reconciler := newClustersReconciler(c, TestNamespace, 0)
+ reconciler := newClustersReconciler(c, TestNamespace, 0, defaultOrigin)
if len(tc.remoteClients) > 0 {
reconciler.remoteClients = tc.remoteClients
diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go
index 4993a16a80..d06de7c15e 100644
--- a/pkg/controller/admissionchecks/multikueue/workload_test.go
+++ b/pkg/controller/admissionchecks/multikueue/workload_test.go
@@ -310,7 +310,7 @@ func TestWlReconcile(t *testing.T) {
managerClient := manageBuilder.Build()
- cRec := newClustersReconciler(managerClient, TestNamespace, 0)
+ cRec := newClustersReconciler(managerClient, TestNamespace, 0, defaultOrigin)
worker1Builder, _ := getClientBuilder()
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &batchv1.JobList{Items: tc.worker1Jobs})
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index 06f49ab03f..45fd3decb7 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -501,6 +501,16 @@ Defaults to kueue-webhook-server-cert.
Defaults to 1min. If 0, the garbage collection is disabled.
+origin
+string
+ |
+
+ A label value used to track the creator of workloads in the worker clusters.
+This is used by multikueue in components like its garbage collector to identify
+remote objects that ware created by this multikueue manager cluster and delete
+them if their local counterpart no longer exists.
+ |
+
From 47260a030f8b622420c0a1863c2f37a4cf664ea7 Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Fri, 2 Feb 2024 12:20:08 +0200
Subject: [PATCH 06/12] Review Remarks
---
apis/config/v1beta1/configuration_types.go | 2 +-
apis/kueue/v1alpha1/multikueue_types.go | 4 ++++
.../multikueue/multikueuecluster.go | 8 +++++---
.../multikueue/multikueuecluster_test.go | 10 +++++-----
.../admissionchecks/multikueue/workload.go | 7 ++-----
.../admissionchecks/multikueue/workload_test.go | 14 +++++++-------
.../en/docs/reference/kueue-config.v1beta1.md | 2 +-
7 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index 7efeb08c6f..d099d7b6b2 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -203,7 +203,7 @@ type WaitForPodsReady struct {
}
type MultiKueue struct {
- // GCTimeout defines the timeout between two consecutive garbage collection runs.
+ // GCTimeout defines the time interval between two consecutive garbage collection runs.
// Defaults to 1min. If 0, the garbage collection is disabled.
// +optional
GCTimeout *metav1.Duration `json:"gcTimeout,omitempty"`
diff --git a/apis/kueue/v1alpha1/multikueue_types.go b/apis/kueue/v1alpha1/multikueue_types.go
index e0357843d9..80cd3ab121 100644
--- a/apis/kueue/v1alpha1/multikueue_types.go
+++ b/apis/kueue/v1alpha1/multikueue_types.go
@@ -23,6 +23,10 @@ import (
const (
MultiKueueConfigSecretKey = "kubeconfig"
MultiKueueClusterActive = "Active"
+
+ // MultiKueueOriginLabel is a label used to track the creator
+ // of multikueue remote objects.
+ MultiKueueOriginLabel = "kueue.x-k8s.io/multikueue-origin"
)
type LocationType string
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
index 424dbce97b..7592ee001b 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
@@ -140,7 +140,7 @@ func (rc *remoteClient) queueWorkloadEvent(ctx context.Context, ev watch.Event)
func (rc *remoteClient) runGC(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)
lst := &kueue.WorkloadList{}
- err := rc.client.List(ctx, lst, client.MatchingLabels{MultiKueueOriginLabelKey: rc.origin})
+ err := rc.client.List(ctx, lst, client.MatchingLabels{kueuealpha.MultiKueueOriginLabel: rc.origin})
if err != nil {
log.V(2).Error(err, "Listing remote workloads")
return
@@ -156,7 +156,7 @@ func (rc *remoteClient) runGC(ctx context.Context) {
}
if err == nil && localWl.DeletionTimestamp.IsZero() {
- // The remote workload is still relevant.
+ // The local workload exists and isn't being deleted, so the remote workload is still relevant.
continue
}
@@ -167,12 +167,14 @@ func (rc *remoteClient) runGC(ctx context.Context) {
if adapter, found := adapters[adapterKey]; !found {
wlLog.V(2).Info("No adapter found", "adapterKey", adapterKey, "ownerKey", ownerKey)
} else {
+ wlLog.V(5).Info("MultiKueueGC deleting workload owner", "ownerKey", ownerKey, "ownnerKind", controller)
err := adapter.DeleteRemoteObject(ctx, rc.client, ownerKey)
if client.IgnoreNotFound(err) != nil {
wlLog.V(2).Error(err, "Deleting remote workload's owner", "ownerKey", ownerKey)
}
}
}
+ wlLog.V(5).Info("MultiKueueGC deleting remote workload")
if err := rc.client.Delete(ctx, &remoteWl); client.IgnoreNotFound(err) != nil {
wlLog.V(2).Error(err, "Deleting remote workload")
}
@@ -354,7 +356,7 @@ func (c *clustersReconciler) runGC(ctx context.Context) {
log.V(2).Info("Garbage Collector Stopped")
return
case <-time.After(c.gcInterval):
- log.V(5).Info("Run Garbage Collection")
+ log.V(4).Info("Run Garbage Collection for Lost Remote Workloads")
for clusterName, rc := range c.remoteClients {
rc.runGC(ctrl.LoggerInto(ctx, log.WithValues("multiKueueCluster", clusterName)))
}
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
index 1b6bb17405..804661e81d 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
@@ -248,7 +248,7 @@ func TestRemoteClientGC(t *testing.T) {
},
workersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
managersJobs: []batchv1.Job{
@@ -265,7 +265,7 @@ func TestRemoteClientGC(t *testing.T) {
},
wantWorkersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
wantManagersJobs: []batchv1.Job{
@@ -280,7 +280,7 @@ func TestRemoteClientGC(t *testing.T) {
"missing workers and their owner jobs are deleted": {
workersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
managersJobs: []batchv1.Job{
@@ -299,7 +299,7 @@ func TestRemoteClientGC(t *testing.T) {
"unrelated workers and jobs are not deleted": {
workersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
- Label(MultiKueueOriginLabelKey, "other-gc-key").
+ Label(kueuealpha.MultiKueueOriginLabel, "other-gc-key").
Obj(),
},
workersJobs: []batchv1.Job{
@@ -308,7 +308,7 @@ func TestRemoteClientGC(t *testing.T) {
},
wantWorkersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
- Label(MultiKueueOriginLabelKey, "other-gc-key").
+ Label(kueuealpha.MultiKueueOriginLabel, "other-gc-key").
Obj(),
},
wantWorkersJobs: []batchv1.Job{
diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go
index e805a821da..8d0c4dd291 100644
--- a/pkg/controller/admissionchecks/multikueue/workload.go
+++ b/pkg/controller/admissionchecks/multikueue/workload.go
@@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
+ kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/workload"
@@ -52,10 +53,6 @@ var (
errNoActiveClusters = errors.New("no active clusters")
)
-const (
- MultiKueueOriginLabelKey = "kueue.x-k8s.io/multikueue-origin"
-)
-
type wlReconciler struct {
client client.Client
helper *multiKueueStoreHelper
@@ -385,7 +382,7 @@ func cloneForCreate(orig *kueue.Workload, origin string) *kueue.Workload {
if remoteWl.Labels == nil {
remoteWl.Labels = make(map[string]string)
}
- remoteWl.Labels[MultiKueueOriginLabelKey] = origin
+ remoteWl.Labels[kueuealpha.MultiKueueOriginLabel] = origin
orig.Spec.DeepCopyInto(&remoteWl.Spec)
return remoteWl
}
diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go
index d06de7c15e..f798758ce0 100644
--- a/pkg/controller/admissionchecks/multikueue/workload_test.go
+++ b/pkg/controller/admissionchecks/multikueue/workload_test.go
@@ -96,7 +96,7 @@ func TestWlReconcile(t *testing.T) {
},
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
@@ -124,7 +124,7 @@ func TestWlReconcile(t *testing.T) {
},
wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
},
@@ -145,7 +145,7 @@ func TestWlReconcile(t *testing.T) {
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
@@ -165,7 +165,7 @@ func TestWlReconcile(t *testing.T) {
wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
@@ -202,7 +202,7 @@ func TestWlReconcile(t *testing.T) {
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: "by test"}).
Obj(),
@@ -227,7 +227,7 @@ func TestWlReconcile(t *testing.T) {
wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: "by test"}).
Obj(),
@@ -269,7 +269,7 @@ func TestWlReconcile(t *testing.T) {
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
- Label(MultiKueueOriginLabelKey, defaultOrigin).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: "by test"}).
Obj(),
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index 45fd3decb7..c58d3e4d11 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -497,7 +497,7 @@ Defaults to kueue-webhook-server-cert.
k8s.io/apimachinery/pkg/apis/meta/v1.Duration
- GCTimeout defines the timeout between two consecutive garbage collection runs.
+ GCTimeout defines the time interval between two consecutive garbage collection runs.
Defaults to 1min. If 0, the garbage collection is disabled.
|
From c9befdfd3c9dfa2756d3a407fbf99e4c4cf0f80b Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Fri, 2 Feb 2024 13:11:57 +0200
Subject: [PATCH 07/12] Use GCInterval in config
---
apis/config/v1beta1/configuration_types.go | 4 ++--
apis/config/v1beta1/defaults.go | 4 ++--
apis/config/v1beta1/defaults_test.go | 12 ++++++------
apis/config/v1beta1/zz_generated.deepcopy.go | 4 ++--
cmd/kueue/main.go | 2 +-
cmd/kueue/main_test.go | 4 ++--
pkg/config/config_test.go | 14 +++++++-------
.../en/docs/reference/kueue-config.v1beta1.md | 4 ++--
8 files changed, 24 insertions(+), 24 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index d099d7b6b2..cf9780c433 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -203,10 +203,10 @@ type WaitForPodsReady struct {
}
type MultiKueue struct {
- // GCTimeout defines the time interval between two consecutive garbage collection runs.
+ // GCInterval defines the time interval between two consecutive garbage collection runs.
// Defaults to 1min. If 0, the garbage collection is disabled.
// +optional
- GCTimeout *metav1.Duration `json:"gcTimeout,omitempty"`
+ GCInterval *metav1.Duration `json:"gcInterval,omitempty"`
// A label value used to track the creator of workloads in the worker clusters.
// This is used by multikueue in components like its garbage collector to identify
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index f2d01dbd6b..022ae05b3b 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -167,8 +167,8 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.MultiKueue == nil {
cfg.MultiKueue = &MultiKueue{}
}
- if cfg.MultiKueue.GCTimeout == nil {
- cfg.MultiKueue.GCTimeout = &metav1.Duration{Duration: DefaultMultiKueueGCTimeout}
+ if cfg.MultiKueue.GCInterval == nil {
+ cfg.MultiKueue.GCInterval = &metav1.Duration{Duration: DefaultMultiKueueGCTimeout}
}
if cfg.MultiKueue.Origin == "" {
cfg.MultiKueue.Origin = DefaultMultiKueueOrigin
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index ad253ec55c..137c95ede6 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -97,8 +97,8 @@ func TestSetDefaults_Configuration(t *testing.T) {
}
defaultMultiKueue := &MultiKueue{
- GCTimeout: &metav1.Duration{Duration: DefaultMultiKueueGCTimeout},
- Origin: DefaultMultiKueueOrigin,
+ GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCTimeout},
+ Origin: DefaultMultiKueueOrigin,
}
podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
@@ -490,8 +490,8 @@ func TestSetDefaults_Configuration(t *testing.T) {
Enable: ptr.To(false),
},
MultiKueue: &MultiKueue{
- GCTimeout: &metav1.Duration{Duration: time.Second},
- Origin: "multikueue-manager1",
+ GCInterval: &metav1.Duration{Duration: time.Second},
+ Origin: "multikueue-manager1",
},
},
want: &Configuration{
@@ -504,8 +504,8 @@ func TestSetDefaults_Configuration(t *testing.T) {
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &MultiKueue{
- GCTimeout: &metav1.Duration{Duration: time.Second},
- Origin: "multikueue-manager1",
+ GCInterval: &metav1.Duration{Duration: time.Second},
+ Origin: "multikueue-manager1",
},
},
},
diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go
index d52585577a..a7ef407639 100644
--- a/apis/config/v1beta1/zz_generated.deepcopy.go
+++ b/apis/config/v1beta1/zz_generated.deepcopy.go
@@ -291,8 +291,8 @@ func (in *InternalCertManagement) DeepCopy() *InternalCertManagement {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MultiKueue) DeepCopyInto(out *MultiKueue) {
*out = *in
- if in.GCTimeout != nil {
- in, out := &in.GCTimeout, &out.GCTimeout
+ if in.GCInterval != nil {
+ in, out := &in.GCInterval, &out.GCInterval
*out = new(v1.Duration)
**out = **in
}
diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go
index adf25f1b1b..9b7a273641 100644
--- a/cmd/kueue/main.go
+++ b/cmd/kueue/main.go
@@ -246,7 +246,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
if features.Enabled(features.MultiKueue) {
if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
- multikueue.WithGCInterval(cfg.MultiKueue.GCTimeout.Duration),
+ multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration),
multikueue.WithOrigin(cfg.MultiKueue.Origin),
); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go
index 23c659c02f..ab6eafe369 100644
--- a/cmd/kueue/main_test.go
+++ b/cmd/kueue/main_test.go
@@ -118,8 +118,8 @@ integrations:
},
},
MultiKueue: &config.MultiKueue{
- GCTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueGCTimeout},
- Origin: config.DefaultMultiKueueOrigin,
+ GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCTimeout},
+ Origin: config.DefaultMultiKueueOrigin,
},
},
},
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 85e5c36ba6..77f7facf43 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -264,7 +264,7 @@ apiVersion: config.kueue.x-k8s.io/v1beta1
kind: Configuration
namespace: kueue-system
multiKueue:
- gcTimeout: 1m30s
+ gcInterval: 1m30s
origin: multikueue-manager1
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
@@ -335,8 +335,8 @@ multiKueue:
}
defaultMultiKueue := &configapi.MultiKueue{
- GCTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCTimeout},
- Origin: configapi.DefaultMultiKueueOrigin,
+ GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCTimeout},
+ Origin: configapi.DefaultMultiKueueOrigin,
}
testcases := []struct {
@@ -799,8 +799,8 @@ multiKueue:
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &configapi.MultiKueue{
- GCTimeout: &metav1.Duration{Duration: 90 * time.Second},
- Origin: "multikueue-manager1",
+ GCInterval: &metav1.Duration{Duration: 90 * time.Second},
+ Origin: "multikueue-manager1",
},
},
wantOptions: defaultControlOptions,
@@ -913,8 +913,8 @@ func TestEncode(t *testing.T) {
"clusterQueues": map[string]any{"maxCount": int64(10)},
},
"multiKueue": map[string]any{
- "gcTimeout": "1m0s",
- "origin": "multikueue",
+ "gcInterval": "1m0s",
+ "origin": "multikueue",
},
},
},
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index c58d3e4d11..b65dd2f092 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -493,11 +493,11 @@ Defaults to kueue-webhook-server-cert.
-gcTimeout
+ |
gcInterval
k8s.io/apimachinery/pkg/apis/meta/v1.Duration
|
- GCTimeout defines the time interval between two consecutive garbage collection runs.
+ GCInterval defines the time interval between two consecutive garbage collection runs.
Defaults to 1min. If 0, the garbage collection is disabled.
|
From 0be9168c16c8dc5c931a85cfac45c19721c53e2c Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Fri, 2 Feb 2024 13:50:02 +0200
Subject: [PATCH 08/12] Review remarks
---
.../multikueue/multikueuecluster_test.go | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
index 804661e81d..424f78c1e6 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
@@ -277,7 +277,22 @@ func TestRemoteClientGC(t *testing.T) {
Obj(),
},
},
- "missing workers and their owner jobs are deleted": {
+ "missing worker workloads are deleted": {
+ workersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
+ Obj(),
+ },
+ managersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ wantManagersJobs: []batchv1.Job{
+ *baseJobBuilder.Clone().
+ Obj(),
+ },
+ },
+ "missing worker workloads and their owner jobs are deleted": {
workersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
From cf085cd83794b4a66cdd12e467b4aacee3a70a45 Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Fri, 2 Feb 2024 16:03:18 +0200
Subject: [PATCH 09/12] Rename gc timeout default const
---
apis/config/v1beta1/defaults.go | 4 ++--
apis/config/v1beta1/defaults_test.go | 2 +-
cmd/kueue/main_test.go | 2 +-
pkg/config/config_test.go | 2 +-
4 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index 022ae05b3b..a547efe3c1 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -44,7 +44,7 @@ const (
DefaultQueueVisibilityUpdateIntervalSeconds int32 = 5
DefaultClusterQueuesMaxCount int32 = 10
defaultJobFrameworkName = "batch/job"
- DefaultMultiKueueGCTimeout = time.Minute
+ DefaultMultiKueueGCInterval = time.Minute
DefaultMultiKueueOrigin = "multikueue"
)
@@ -168,7 +168,7 @@ func SetDefaults_Configuration(cfg *Configuration) {
cfg.MultiKueue = &MultiKueue{}
}
if cfg.MultiKueue.GCInterval == nil {
- cfg.MultiKueue.GCInterval = &metav1.Duration{Duration: DefaultMultiKueueGCTimeout}
+ cfg.MultiKueue.GCInterval = &metav1.Duration{Duration: DefaultMultiKueueGCInterval}
}
if cfg.MultiKueue.Origin == "" {
cfg.MultiKueue.Origin = DefaultMultiKueueOrigin
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index 137c95ede6..a2f085b740 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -97,7 +97,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
}
defaultMultiKueue := &MultiKueue{
- GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCTimeout},
+ GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval},
Origin: DefaultMultiKueueOrigin,
}
diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go
index ab6eafe369..3d12e4cfdb 100644
--- a/cmd/kueue/main_test.go
+++ b/cmd/kueue/main_test.go
@@ -118,7 +118,7 @@ integrations:
},
},
MultiKueue: &config.MultiKueue{
- GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCTimeout},
+ GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval},
Origin: config.DefaultMultiKueueOrigin,
},
},
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 77f7facf43..8a71607c79 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -335,7 +335,7 @@ multiKueue:
}
defaultMultiKueue := &configapi.MultiKueue{
- GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCTimeout},
+ GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval},
Origin: configapi.DefaultMultiKueueOrigin,
}
From f04f5a41a58c548bf9056c0aeef54af67aad2ec3 Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Mon, 5 Feb 2024 09:21:38 +0200
Subject: [PATCH 10/12] Review Remarks and UT scenario cleanup.
---
apis/config/v1beta1/configuration_types.go | 5 +-
apis/config/v1beta1/defaults_test.go | 25 ++++++++++
.../multikueue/multikueuecluster.go | 6 +--
.../multikueue/multikueuecluster_test.go | 47 ++++---------------
.../en/docs/reference/kueue-config.v1beta1.md | 5 +-
5 files changed, 43 insertions(+), 45 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index cf9780c433..c3717b3615 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -208,9 +208,10 @@ type MultiKueue struct {
// +optional
GCInterval *metav1.Duration `json:"gcInterval,omitempty"`
- // A label value used to track the creator of workloads in the worker clusters.
+ // Origin defines a label value used to track the creator of workloads in the worker
+ // clusters.
// This is used by multikueue in components like its garbage collector to identify
- // remote objects that ware created by this multikueue manager cluster and delete
+ // remote objects that ware created by this multikueue manager cluster and delete
// them if their local counterpart no longer exists.
// +optional
Origin string `json:"origin,omitempty"`
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index a2f085b740..f6db84b39e 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -509,6 +509,31 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
},
},
+ "multiKueue GCInterval 0": {
+ original: &Configuration{
+ InternalCertManagement: &InternalCertManagement{
+ Enable: ptr.To(false),
+ },
+ MultiKueue: &MultiKueue{
+ GCInterval: &metav1.Duration{},
+ Origin: "multikueue-manager1",
+ },
+ },
+ want: &Configuration{
+ Namespace: ptr.To(DefaultNamespace),
+ ControllerManager: defaultCtrlManagerConfigurationSpec,
+ InternalCertManagement: &InternalCertManagement{
+ Enable: ptr.To(false),
+ },
+ ClientConnection: defaultClientConnection,
+ Integrations: defaultIntegrations,
+ QueueVisibility: defaultQueueVisibility,
+ MultiKueue: &MultiKueue{
+ GCInterval: &metav1.Duration{},
+ Origin: "multikueue-manager1",
+ },
+ },
+ },
}
for name, tc := range testCases {
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
index 7592ee001b..88a6e8f0c4 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
@@ -134,9 +134,9 @@ func (rc *remoteClient) queueWorkloadEvent(ctx context.Context, ev watch.Event)
}
}
-// runGc - lists all the remote workloads having the same multikueue-origin and remove those who
-// no longer have a local corespondent (missing or awaiting deletion). If the remote workload is
-// is owned by a job,also delete the job.
+// runGC - lists all the remote workloads having the same multikueue-origin and remove those who
+// no longer have a local correspondent (missing or awaiting deletion). If the remote workload
+// is owned by a job, also delete the job.
func (rc *remoteClient) runGC(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)
lst := &kueue.WorkloadList{}
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
index 424f78c1e6..c2074986a4 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go
@@ -236,10 +236,8 @@ func TestRemoteClientGC(t *testing.T) {
managersJobs []batchv1.Job
workersJobs []batchv1.Job
- wantManagersWorkloads []kueue.Workload
- wantWorkersWorkloads []kueue.Workload
- wantManagersJobs []batchv1.Job
- wantWorkersJobs []batchv1.Job
+ wantWorkersWorkloads []kueue.Workload
+ wantWorkersJobs []batchv1.Job
}{
"existing workers and jobs are not deleted": {
managersWorkloads: []kueue.Workload{
@@ -259,19 +257,11 @@ func TestRemoteClientGC(t *testing.T) {
*baseJobBuilder.Clone().
Obj(),
},
- wantManagersWorkloads: []kueue.Workload{
- *baseWlBuilder.Clone().
- Obj(),
- },
wantWorkersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
- wantManagersJobs: []batchv1.Job{
- *baseJobBuilder.Clone().
- Obj(),
- },
wantWorkersJobs: []batchv1.Job{
*baseJobBuilder.Clone().
Obj(),
@@ -287,8 +277,12 @@ func TestRemoteClientGC(t *testing.T) {
*baseJobBuilder.Clone().
Obj(),
},
- wantManagersJobs: []batchv1.Job{
- *baseJobBuilder.Clone().
+ },
+ "missing worker workloads are deleted (no job adapter)": {
+ workersWorkloads: []kueue.Workload{
+ *baseWlBuilder.Clone().
+ OwnerReference(batchv1.SchemeGroupVersion.WithKind("NptAJob"), "job1", "test-uuid", true, true).
+ Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
},
@@ -306,10 +300,6 @@ func TestRemoteClientGC(t *testing.T) {
*baseJobBuilder.Clone().
Obj(),
},
- wantManagersJobs: []batchv1.Job{
- *baseJobBuilder.Clone().
- Obj(),
- },
},
"unrelated workers and jobs are not deleted": {
workersWorkloads: []kueue.Workload{
@@ -353,18 +343,8 @@ func TestRemoteClientGC(t *testing.T) {
w1remoteClient.runGC(ctx)
- gotManagersWokloads := &kueue.WorkloadList{}
- err := managerClient.List(ctx, gotManagersWokloads)
- if err != nil {
- t.Error("unexpected list manager's workloads error")
- }
-
- if diff := cmp.Diff(tc.wantManagersWorkloads, gotManagersWokloads.Items, objCheckOpts...); diff != "" {
- t.Errorf("unexpected manager's workloads (-want/+got):\n%s", diff)
- }
-
gotWorker1Wokloads := &kueue.WorkloadList{}
- err = worker1Client.List(ctx, gotWorker1Wokloads)
+ err := worker1Client.List(ctx, gotWorker1Wokloads)
if err != nil {
t.Error("unexpected list worker's workloads error")
}
@@ -372,15 +352,6 @@ func TestRemoteClientGC(t *testing.T) {
if diff := cmp.Diff(tc.wantWorkersWorkloads, gotWorker1Wokloads.Items, objCheckOpts...); diff != "" {
t.Errorf("unexpected worker's workloads (-want/+got):\n%s", diff)
}
- gotManagersJobs := &batchv1.JobList{}
- err = managerClient.List(ctx, gotManagersJobs)
- if err != nil {
- t.Error("unexpected list manager's jobs error")
- }
-
- if diff := cmp.Diff(tc.wantManagersJobs, gotManagersJobs.Items, objCheckOpts...); diff != "" {
- t.Errorf("unexpected manager's jobs (-want/+got):\n%s", diff)
- }
gotWorker1Job := &batchv1.JobList{}
err = worker1Client.List(ctx, gotWorker1Job)
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index b65dd2f092..8fbe37d483 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -505,9 +505,10 @@ Defaults to 1min. If 0, the garbage collection is disabled.
string
- A label value used to track the creator of workloads in the worker clusters.
+ Origin defines a label value used to track the creator of workloads in the worker
+clusters.
This is used by multikueue in components like its garbage collector to identify
-remote objects that ware created by this multikueue manager cluster and delete
+remote objects that ware created by this multikueue manager cluster and delete
them if their local counterpart no longer exists.
|
From d531485f01f5ae3b5640b2ae105ea4c1dd99041b Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Mon, 5 Feb 2024 18:30:21 +0200
Subject: [PATCH 11/12] [config] Make multikueue Origin pointer
---
apis/config/v1beta1/configuration_types.go | 4 ++--
apis/config/v1beta1/defaults.go | 4 ++--
apis/config/v1beta1/defaults_test.go | 10 +++++-----
apis/config/v1beta1/zz_generated.deepcopy.go | 5 +++++
cmd/kueue/main.go | 3 ++-
cmd/kueue/main_test.go | 2 +-
pkg/config/config_test.go | 4 ++--
7 files changed, 19 insertions(+), 13 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index c3717b3615..7a7cd35d85 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -206,7 +206,7 @@ type MultiKueue struct {
// GCInterval defines the time interval between two consecutive garbage collection runs.
// Defaults to 1min. If 0, the garbage collection is disabled.
// +optional
- GCInterval *metav1.Duration `json:"gcInterval,omitempty"`
+ GCInterval *metav1.Duration `json:"gcInterval"`
// Origin defines a label value used to track the creator of workloads in the worker
// clusters.
@@ -214,7 +214,7 @@ type MultiKueue struct {
// remote objects that ware created by this multikueue manager cluster and delete
// them if their local counterpart no longer exists.
// +optional
- Origin string `json:"origin,omitempty"`
+ Origin *string `json:"origin,omitempty"`
}
type RequeuingTimestamp string
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index a547efe3c1..9de538b39c 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -170,7 +170,7 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.MultiKueue.GCInterval == nil {
cfg.MultiKueue.GCInterval = &metav1.Duration{Duration: DefaultMultiKueueGCInterval}
}
- if cfg.MultiKueue.Origin == "" {
- cfg.MultiKueue.Origin = DefaultMultiKueueOrigin
+ if ptr.Deref(cfg.MultiKueue.Origin, "") == "" {
+ cfg.MultiKueue.Origin = ptr.To(DefaultMultiKueueOrigin)
}
}
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index f6db84b39e..8d90562580 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -98,7 +98,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
defaultMultiKueue := &MultiKueue{
GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval},
- Origin: DefaultMultiKueueOrigin,
+ Origin: ptr.To(DefaultMultiKueueOrigin),
}
podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
@@ -491,7 +491,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{Duration: time.Second},
- Origin: "multikueue-manager1",
+ Origin: ptr.To("multikueue-manager1"),
},
},
want: &Configuration{
@@ -505,7 +505,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
QueueVisibility: defaultQueueVisibility,
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{Duration: time.Second},
- Origin: "multikueue-manager1",
+ Origin: ptr.To("multikueue-manager1"),
},
},
},
@@ -516,7 +516,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{},
- Origin: "multikueue-manager1",
+ Origin: ptr.To("multikueue-manager1"),
},
},
want: &Configuration{
@@ -530,7 +530,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
QueueVisibility: defaultQueueVisibility,
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{},
- Origin: "multikueue-manager1",
+ Origin: ptr.To("multikueue-manager1"),
},
},
},
diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go
index a7ef407639..31f2197c31 100644
--- a/apis/config/v1beta1/zz_generated.deepcopy.go
+++ b/apis/config/v1beta1/zz_generated.deepcopy.go
@@ -296,6 +296,11 @@ func (in *MultiKueue) DeepCopyInto(out *MultiKueue) {
*out = new(v1.Duration)
**out = **in
}
+ if in.Origin != nil {
+ in, out := &in.Origin, &out.Origin
+ *out = new(string)
+ **out = **in
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKueue.
diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go
index 9b7a273641..ea1acd7365 100644
--- a/cmd/kueue/main.go
+++ b/cmd/kueue/main.go
@@ -25,6 +25,7 @@ import (
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
+ "k8s.io/utils/ptr"
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
@@ -247,7 +248,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
if features.Enabled(features.MultiKueue) {
if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration),
- multikueue.WithOrigin(cfg.MultiKueue.Origin),
+ multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)),
); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
os.Exit(1)
diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go
index 3d12e4cfdb..9b470a6304 100644
--- a/cmd/kueue/main_test.go
+++ b/cmd/kueue/main_test.go
@@ -119,7 +119,7 @@ integrations:
},
MultiKueue: &config.MultiKueue{
GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval},
- Origin: config.DefaultMultiKueueOrigin,
+ Origin: ptr.To(config.DefaultMultiKueueOrigin),
},
},
},
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 8a71607c79..6d858d7720 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -336,7 +336,7 @@ multiKueue:
defaultMultiKueue := &configapi.MultiKueue{
GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval},
- Origin: configapi.DefaultMultiKueueOrigin,
+ Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
}
testcases := []struct {
@@ -800,7 +800,7 @@ multiKueue:
QueueVisibility: defaultQueueVisibility,
MultiKueue: &configapi.MultiKueue{
GCInterval: &metav1.Duration{Duration: 90 * time.Second},
- Origin: "multikueue-manager1",
+ Origin: ptr.To("multikueue-manager1"),
},
},
wantOptions: defaultControlOptions,
From ebe1e328048158d3b59e8803eeaaba60f8190d4f Mon Sep 17 00:00:00 2001
From: Traian Schiau
Date: Mon, 5 Feb 2024 19:05:24 +0200
Subject: [PATCH 12/12] Review remarks
---
cmd/kueue/main.go | 2 +-
.../admissionchecks/multikueue/multikueuecluster.go | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go
index ea1acd7365..92dced5870 100644
--- a/cmd/kueue/main.go
+++ b/cmd/kueue/main.go
@@ -25,7 +25,6 @@ import (
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
- "k8s.io/utils/ptr"
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
@@ -39,6 +38,7 @@ import (
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
+ "k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
index 88a6e8f0c4..2375d34dab 100644
--- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
+++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go
@@ -162,13 +162,13 @@ func (rc *remoteClient) runGC(ctx context.Context) {
// if the remote wl has a controller(owning Job), delete the job
if controller := metav1.GetControllerOf(&remoteWl); controller != nil {
- ownerKey := types.NamespacedName{Name: controller.Name, Namespace: remoteWl.Namespace}
+ ownerKey := klog.KRef(remoteWl.Namespace, controller.Name)
adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String()
if adapter, found := adapters[adapterKey]; !found {
wlLog.V(2).Info("No adapter found", "adapterKey", adapterKey, "ownerKey", ownerKey)
} else {
wlLog.V(5).Info("MultiKueueGC deleting workload owner", "ownerKey", ownerKey, "ownnerKind", controller)
- err := adapter.DeleteRemoteObject(ctx, rc.client, ownerKey)
+ err := adapter.DeleteRemoteObject(ctx, rc.client, types.NamespacedName{Name: controller.Name, Namespace: remoteWl.Namespace})
if client.IgnoreNotFound(err) != nil {
wlLog.V(2).Error(err, "Deleting remote workload's owner", "ownerKey", ownerKey)
}
@@ -344,7 +344,7 @@ func (c *clustersReconciler) updateStatus(ctx context.Context, cluster *kueuealp
}
func (c *clustersReconciler) runGC(ctx context.Context) {
- log := ctrl.LoggerFrom(ctx)
+ log := ctrl.LoggerFrom(ctx).WithName("MultiKueueGC")
if c.gcInterval == 0 {
log.V(2).Info("Garbage Collection is disabled")
return