diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 41a49cce345..7fb0516ace1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -146,6 +146,7 @@ func StartOperator(kubeconfig string) { kubeInformerFactory.Core().V1().Services(), hostsTemplate, pkg.Version, + minioInformerFactory, ) go kubeInformerFactory.Start(stopCh) diff --git a/pkg/controller/job-controller.go b/pkg/controller/job-controller.go new file mode 100644 index 00000000000..adabf74a59b --- /dev/null +++ b/pkg/controller/job-controller.go @@ -0,0 +1,136 @@ +// This file is part of MinIO Operator +// Copyright (c) 2024 MinIO, Inc. + +package controller + +import ( + "context" + "fmt" + "time" + + corelisters "k8s.io/client-go/listers/core/v1" + + informers "github.com/minio/operator/pkg/client/informers/externalversions/job.min.io/v1alpha1" + listers "github.com/minio/operator/pkg/client/listers/job.min.io/v1alpha1" + "golang.org/x/time/rate" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + appslisters "k8s.io/client-go/listers/apps/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +type jobController struct { + lister listers.MinIOJobLister + hasSynced cache.InformerSynced + workqueue workqueue.RateLimitingInterface + kubeClientSet kubernetes.Interface + statefulSetLister appslisters.StatefulSetLister + recorder record.EventRecorder +} + +type controllerConfig struct { + serviceLister corelisters.ServiceLister + kubeClientSet kubernetes.Interface + statefulSetLister appslisters.StatefulSetLister + deploymentLister appslisters.DeploymentLister + recorder record.EventRecorder +} + +// JobControllerInterface is an interface for the controller with the methods supported by it. +type JobControllerInterface interface { + WorkQueue() workqueue.RateLimitingInterface + KeyFunc() cache.KeyFunc + HasSynced() cache.InformerSynced + SyncHandler(ctx context.Context, name, namespace string) error + HandleObject(obj metav1.Object) +} + +func enqueue(c JobControllerInterface, obj interface{}) { + var key string + var err error + if key, err = c.KeyFunc()(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.WorkQueue().Add(key) +} + +func newJobController(informer informers.MinIOJobInformer, config controllerConfig) *jobController { + rateLimiter := workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, + ) + jobController := &jobController{ + lister: informer.Lister(), + hasSynced: informer.Informer().HasSynced, + workqueue: workqueue.NewRateLimitingQueue(rateLimiter), + kubeClientSet: config.kubeClientSet, + statefulSetLister: config.statefulSetLister, + recorder: config.recorder, + } + // Set up an event handler for when resources change + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueue(jobController, obj) + }, + UpdateFunc: func(old, new interface{}) { + enqueue(jobController, new) + }, + }) + return jobController +} + +func (c *jobController) WorkQueue() workqueue.RateLimitingInterface { + return c.workqueue +} + +func (c *jobController) KeyFunc() cache.KeyFunc { + return cache.MetaNamespaceKeyFunc +} + +func (c *jobController) HasSynced() cache.InformerSynced { + return c.hasSynced +} + +func (c *jobController) HandleObject(obj metav1.Object) { + JobCRDResourceKind := "MinIOJob" + if ownerRef := metav1.GetControllerOf(obj); ownerRef != nil { + switch ownerRef.Kind { + case JobCRDResourceKind: + job, err := c.lister.MinIOJobs(obj.GetNamespace()).Get(ownerRef.Name) + if err != nil { + klog.V(4).Info("Ignore orphaned object", "object", klog.KObj(job), JobCRDResourceKind, ownerRef.Name) + return + } + enqueue(c, job) + default: + return + } + return + } +} + +// syncJobHandler compares the current Job state with the desired, and attempts to +// converge the two. It then updates the Status block of the Job resource +// with the current status of the resource. +func (c *jobController) SyncHandler(ctx context.Context, name, namespace string) error { + // Get the Job resource with this namespace/name + _, err := c.lister.MinIOJobs(namespace).Get(name) + if err != nil { + // The Job resource may no longer exist, in which case we stop + // processing. + if errors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("job '%s' in work queue no longer exists: %+v", name, err)) + return nil + } + + return err + } + + return nil +} diff --git a/pkg/controller/main-controller.go b/pkg/controller/main-controller.go index b0567f9562c..6450c3170f5 100644 --- a/pkg/controller/main-controller.go +++ b/pkg/controller/main-controller.go @@ -73,6 +73,7 @@ import ( miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" clientset "github.com/minio/operator/pkg/client/clientset/versioned" minioscheme "github.com/minio/operator/pkg/client/clientset/versioned/scheme" + minioinformers "github.com/minio/operator/pkg/client/informers/externalversions" informers "github.com/minio/operator/pkg/client/informers/externalversions/minio.min.io/v2" stsInformers "github.com/minio/operator/pkg/client/informers/externalversions/sts.min.io/v1alpha1" "github.com/minio/operator/pkg/resources/statefulsets" @@ -196,6 +197,11 @@ type Controller struct { // policyBindingListerSynced returns true if the PolicyBinding shared informer // has synced at least once. policyBindingListerSynced cache.InformerSynced + + // controllers denotes the list of components controlled + // by the controller. Each component is itself + // a controller. This handle is for supporting the abstraction. + controllers []JobControllerInterface } // EventType is Event type to handle @@ -231,6 +237,7 @@ func NewController( serviceInformer coreinformers.ServiceInformer, hostsTemplate, operatorVersion string, + minioInformerFactory minioinformers.SharedInformerFactory, ) *Controller { // Create event broadcaster // Add minio-controller types to the default Kubernetes Scheme so Events can be @@ -271,6 +278,14 @@ func NewController( oprImg = env.Get(DefaultOperatorImageEnv, oprImg) + controllerConfig := controllerConfig{ + serviceLister: serviceInformer.Lister(), + kubeClientSet: kubeClientSet, + statefulSetLister: statefulSetInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), + recorder: recorder, + } + controller := &Controller{ podName: podName, namespacesToWatch: namespacesToWatch, @@ -293,6 +308,9 @@ func NewController( operatorVersion: operatorVersion, policyBindingListerSynced: policyBindingInformer.Informer().HasSynced, operatorImage: oprImg, + controllers: []JobControllerInterface{ + newJobController(minioInformerFactory.Job().V1alpha1().MinIOJobs(), controllerConfig), + }, } // Initialize operator HTTP upgrade server handlers