diff --git a/cmd/clusterlink/controller-manager/app/controllerstarter.go b/cmd/clusterlink/controller-manager/app/controllerstarter.go index 33076e819..e47dd923b 100644 --- a/cmd/clusterlink/controller-manager/app/controllerstarter.go +++ b/cmd/clusterlink/controller-manager/app/controllerstarter.go @@ -19,10 +19,11 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) type Controller struct { - processor utils.AsyncWorker + processor lifted.AsyncWorker clusterLinkClient versioned.Interface clusterLister v1alpha1.ClusterLister mgr ctrl.Manager @@ -44,15 +45,15 @@ func NewController(clusterLinkClient *versioned.Clientset, mgr ctrl.Manager, opt func (c *Controller) Start(ctx context.Context) error { stopCh := ctx.Done() c.ctx = ctx - opt := utils.Options{ + opt := lifted.WorkerOptions{ Name: "cluster Controller", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: c.Reconcile, RateLimiterOptions: c.opts.RateLimiterOpts, } - c.processor = utils.NewAsyncWorker(opt) + c.processor = lifted.NewAsyncWorker(opt) clusterInformerFactory := externalversions.NewSharedInformerFactory(c.clusterLinkClient, 0) clusterInformer := clusterInformerFactory.Kosmos().V1alpha1().Clusters().Informer() @@ -110,7 +111,7 @@ func (c *Controller) OnDelete(obj interface{}) { c.OnAdd(obj) } -func (c *Controller) Reconcile(key utils.QueueKey) error { +func (c *Controller) Reconcile(key lifted.QueueKey) error { cluster, err := c.clusterLister.Get(c.opts.ClusterName) if err != nil { return err diff --git a/cmd/clusterlink/controller-manager/app/options/options.go b/cmd/clusterlink/controller-manager/app/options/options.go index babd4c7b1..113c8aa4a 100644 --- a/cmd/clusterlink/controller-manager/app/options/options.go +++ b/cmd/clusterlink/controller-manager/app/options/options.go @@ -9,13 +9,13 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) type ControllerManagerOptions struct { Controllers []string - RateLimiterOpts flags.Options + RateLimiterOpts lifted.RateLimitOptions ControlPanelConfig string diff --git a/cmd/clustertree/cluster-manager/app/extersion_apps.go b/cmd/clustertree/cluster-manager/app/extersion_apps.go index 1f9b11c36..cab4ebfbd 100644 --- a/cmd/clustertree/cluster-manager/app/extersion_apps.go +++ b/cmd/clustertree/cluster-manager/app/extersion_apps.go @@ -15,7 +15,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" - "github.com/kosmos.io/kosmos/pkg/utils/flags" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) // StartHostDaemonSetsController starts a new HostDaemonSetsController. @@ -67,7 +67,7 @@ func StartDistributeController(ctx context.Context, opts *options.Options, workN } kosmosFactory := externalversions.NewSharedInformerFactory(kosmosClient, 0) - option := flags.Options{} + option := lifted.RateLimitOptions{} controller := daemonset.NewDistributeController( kosmosClient, kosmosFactory.Kosmos().V1alpha1().ShadowDaemonSets(), @@ -98,7 +98,7 @@ func StartDaemonSetsController(ctx context.Context, opts *options.Options, workN } kosmosFactory := externalversions.NewSharedInformerFactory(kosmosClient, 0) - option := flags.Options{} + option := lifted.RateLimitOptions{} controller := daemonset.NewDaemonSetsController( kosmosFactory.Kosmos().V1alpha1().ShadowDaemonSets(), kosmosFactory.Kosmos().V1alpha1().DaemonSets(), @@ -131,7 +131,7 @@ func StartDaemonSetsMirrorController(ctx context.Context, opts *options.Options, } kosmosFactory := externalversions.NewSharedInformerFactory(kosmosClient, 0) kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) - option := flags.Options{} + option := lifted.RateLimitOptions{} controller := daemonset.NewDaemonSetsMirrorController( kosmosClient, kubeClient, @@ -162,7 +162,7 @@ func StartPodReflectController(ctx context.Context, opts *options.Options, workN } kosmosFactory := externalversions.NewSharedInformerFactory(kosmosClient, 0) kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) - option := flags.Options{} + option := lifted.RateLimitOptions{} controller := daemonset.NewPodReflectorController( kubeClient, kubeFactory.Apps().V1().DaemonSets(), diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index a2b031f71..e55936fa1 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -10,6 +10,7 @@ import ( componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils/flags" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) const ( @@ -45,7 +46,7 @@ type Options struct { // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources ReservedNamespaces []string - RateLimiterOpts flags.Options + RateLimiterOpts lifted.RateLimitOptions BackoffOpts flags.BackoffOptions diff --git a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go index 72f40c749..f3edf14da 100644 --- a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go +++ b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go @@ -28,8 +28,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" "github.com/kosmos.io/kosmos/pkg/utils/net" ) @@ -249,9 +249,9 @@ func (e *EtcdBackend) ListIPPools() ([]*ExternalClusterIPPool, []IPPool, error) type Controller struct { globalExtIPPoolSet ExternalIPPoolSet - RateLimiterOptions flags.Options + RateLimiterOptions lifted.RateLimitOptions clusterName string - processor utils.AsyncWorker + processor lifted.AsyncWorker clusterLinkClient *versioned.Clientset clusterLister v1alpha1.ClusterLister kubeClient *kubernetes.Clientset @@ -292,15 +292,15 @@ func (c *Controller) OnDelete(obj interface{}) { func (c *Controller) Start(ctx context.Context) error { klog.Infof("Starting CalicoIPPool Controller.") c.stopCh = ctx.Done() - opt := utils.Options{ + opt := lifted.WorkerOptions{ Name: "cluster Controller", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: c.Reconcile, RateLimiterOptions: c.RateLimiterOptions, } - c.processor = utils.NewAsyncWorker(opt) + c.processor = lifted.NewAsyncWorker(opt) factory := externalversions.NewSharedInformerFactory(c.clusterLinkClient, 0) informer := factory.Kosmos().V1alpha1().Clusters().Informer() @@ -322,7 +322,7 @@ func (c *Controller) Start(ctx context.Context) error { return nil } -func (c *Controller) Reconcile(key utils.QueueKey) error { +func (c *Controller) Reconcile(key lifted.QueueKey) error { clusterWideKey, ok := key.(keys.ClusterWideKey) if !ok { klog.Error("invalid key") diff --git a/pkg/clusterlink/controllers/cluster/cluster_controller.go b/pkg/clusterlink/controllers/cluster/cluster_controller.go index c3353484f..120e454f1 100644 --- a/pkg/clusterlink/controllers/cluster/cluster_controller.go +++ b/pkg/clusterlink/controllers/cluster/cluster_controller.go @@ -33,8 +33,8 @@ import ( clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) // KubeFlannelNetworkConfig @@ -52,9 +52,9 @@ type SetClusterPodCIDRFun func(cluster *clusterlinkv1alpha1.Cluster) error type Controller struct { // RateLimiterOptions is the configuration for rate limiter which may significantly influence the performance of // the Controller. - RateLimiterOptions flags.Options + RateLimiterOptions lifted.RateLimitOptions clusterName string - processor utils.AsyncWorker + processor lifted.AsyncWorker kubeClient *kubernetes.Clientset dynamicClient *dynamic.DynamicClient podLister v1.PodLister @@ -84,15 +84,15 @@ func (c *Controller) Start(ctx context.Context) error { klog.Infof("Starting cluster Controller.") c.stopCh = ctx.Done() - opt := utils.Options{ + opt := lifted.WorkerOptions{ Name: "cluster Controller", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: c.Reconcile, RateLimiterOptions: c.RateLimiterOptions, } - c.processor = utils.NewAsyncWorker(opt) + c.processor = lifted.NewAsyncWorker(opt) factory := informers.NewSharedInformerFactory(c.kubeClient, 0) informer := factory.Core().V1().Pods().Informer() @@ -162,7 +162,7 @@ func (c *Controller) Start(ctx context.Context) error { return nil } -func (c *Controller) Reconcile(key utils.QueueKey) error { +func (c *Controller) Reconcile(key lifted.QueueKey) error { clusterWideKey, ok := key.(keys.ClusterWideKey) if !ok { klog.Error("invalid key") diff --git a/pkg/clusterlink/controllers/context/context.go b/pkg/clusterlink/controllers/context/context.go index 0e024cbf5..89645551d 100644 --- a/pkg/clusterlink/controllers/context/context.go +++ b/pkg/clusterlink/controllers/context/context.go @@ -10,7 +10,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" - "github.com/kosmos.io/kosmos/pkg/utils/flags" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) type Options struct { @@ -18,7 +18,7 @@ type Options struct { Controllers []string ControlPanelConfig *rest.Config ClusterName string - RateLimiterOpts flags.Options + RateLimiterOpts lifted.RateLimitOptions } // Context defines the context object for controller. diff --git a/pkg/clusterlink/controllers/nodecidr/adapter.go b/pkg/clusterlink/controllers/nodecidr/adapter.go index 0b45f447d..fff0744cb 100644 --- a/pkg/clusterlink/controllers/nodecidr/adapter.go +++ b/pkg/clusterlink/controllers/nodecidr/adapter.go @@ -19,7 +19,7 @@ import ( clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) type cniAdapter interface { @@ -35,12 +35,12 @@ type commonAdapter struct { config *rest.Config nodeLister lister.NodeLister clusterNodeLister clusterlister.ClusterNodeLister - processor utils.AsyncWorker + processor lifted.AsyncWorker } func NewCommonAdapter(config *rest.Config, clusterNodeLister clusterlister.ClusterNodeLister, - processor utils.AsyncWorker) *commonAdapter { + processor lifted.AsyncWorker) *commonAdapter { return &commonAdapter{ config: config, clusterNodeLister: clusterNodeLister, @@ -119,12 +119,12 @@ type calicoAdapter struct { config *rest.Config blockLister cache.GenericLister clusterNodeLister clusterlister.ClusterNodeLister - processor utils.AsyncWorker + processor lifted.AsyncWorker } func NewCalicoAdapter(config *rest.Config, clusterNodeLister clusterlister.ClusterNodeLister, - processor utils.AsyncWorker) *calicoAdapter { + processor lifted.AsyncWorker) *calicoAdapter { return &calicoAdapter{ config: config, clusterNodeLister: clusterNodeLister, @@ -250,7 +250,7 @@ func (c *calicoAdapter) OnDelete(obj interface{}) { requeue(node, c.clusterNodeLister, c.processor) } -func requeue(originNodeName string, clusterNodeLister clusterlister.ClusterNodeLister, processor utils.AsyncWorker) { +func requeue(originNodeName string, clusterNodeLister clusterlister.ClusterNodeLister, processor lifted.AsyncWorker) { clusterNodes, err := clusterNodeLister.List(labels.Everything()) if err != nil { klog.Errorf("list clusterNodes err: %v", err) diff --git a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go index f564befbe..87195187d 100644 --- a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go +++ b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go @@ -23,9 +23,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" clusterinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1" clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) const ( @@ -34,7 +33,7 @@ const ( calicoCNI = "calico" ) -type controller struct { +type Controller struct { clusterName string config *rest.Config @@ -42,9 +41,9 @@ type controller struct { nodeLister lister.NodeLister // RateLimiterOptions is the configuration for rate limiter which may significantly influence the performance of - // the controller. - RateLimiterOptions flags.Options - processor utils.AsyncWorker + // the Controller. + RateLimiterOptions lifted.RateLimitOptions + processor lifted.AsyncWorker clusterNodeInformer clusterinformer.ClusterNodeInformer clusterNodeLister clusterlister.ClusterNodeLister @@ -53,8 +52,8 @@ type controller struct { ctx context.Context } -func NewNodeCIDRController(config *rest.Config, clusterName string, clusterLinkClient versioned.Interface, RateLimiterOptions flags.Options, context context.Context) *controller { - return &controller{ +func NewNodeCIDRController(config *rest.Config, clusterName string, clusterLinkClient versioned.Interface, RateLimiterOptions lifted.RateLimitOptions, context context.Context) *Controller { + return &Controller{ clusterLinkClient: clusterLinkClient, config: config, RateLimiterOptions: RateLimiterOptions, @@ -63,16 +62,16 @@ func NewNodeCIDRController(config *rest.Config, clusterName string, clusterLinkC } } -func (c *controller) Start(ctx context.Context) error { - klog.Infof("Starting node cidr controller.") +func (c *Controller) Start(ctx context.Context) error { + klog.Infof("Starting node cidr Controller.") - opt := utils.Options{ - Name: "node cidr controller", + opt := lifted.WorkerOptions{ + Name: "node cidr Controller", KeyFunc: ClusterWideKeyFunc, ReconcileFunc: c.Reconcile, RateLimiterOptions: c.RateLimiterOptions, } - c.processor = utils.NewAsyncWorker(opt) + c.processor = lifted.NewAsyncWorker(opt) clusterInformerFactory := externalversions.NewSharedInformerFactory(c.clusterLinkClient, 0) @@ -131,11 +130,11 @@ func (c *controller) Start(ctx context.Context) error { // last step : start processor and waiting for close chan c.processor.Run(workNum, stopCh) <-stopCh - klog.Infof("Stop node cidr controller as process done.") + klog.Infof("Stop node cidr Controller as process done.") return nil } -func (c *controller) Reconcile(key utils.QueueKey) error { +func (c *Controller) Reconcile(key lifted.QueueKey) error { clusterWideKey, ok := key.(keys.ClusterWideKey) if !ok { klog.Error("invalid key") @@ -219,12 +218,12 @@ func (c *controller) Reconcile(key utils.QueueKey) error { } // ClusterWideKeyFunc generates a ClusterWideKey for object. -func ClusterWideKeyFunc(obj interface{}) (utils.QueueKey, error) { +func ClusterWideKeyFunc(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) } // OnAdd handles object add event and push the object to queue. -func (c *controller) OnAdd(obj interface{}) { +func (c *Controller) OnAdd(obj interface{}) { runtimeObj, ok := obj.(runtime.Object) if !ok { return @@ -233,16 +232,16 @@ func (c *controller) OnAdd(obj interface{}) { } // OnUpdate handles object update event and push the object to queue. -func (c *controller) OnUpdate(oldObj, newObj interface{}) { +func (c *Controller) OnUpdate(oldObj, newObj interface{}) { c.OnAdd(newObj) } // OnDelete handles object delete event and push the object to queue. -func (c *controller) OnDelete(obj interface{}) { +func (c *Controller) OnDelete(obj interface{}) { c.OnAdd(obj) } -func (c *controller) EventFilter(obj interface{}) bool { +func (c *Controller) EventFilter(obj interface{}) bool { //todo return true } diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go index f3f860fe8..e0c4bbb43 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go @@ -30,6 +30,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/helper" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) const ServiceExportControllerName = "service-export-controller" @@ -41,7 +42,7 @@ type ServiceExportController struct { Logger logr.Logger // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources ReservedNamespaces []string - RateLimiterOptions flags.Options + RateLimiterOptions lifted.RateLimitOptions BackoffOptions flags.BackoffOptions } @@ -115,7 +116,7 @@ func (c *ServiceExportController) SetupWithManager(mgr manager.Manager) error { endpointSlicePredicate, ). WithOptions(controller.Options{ - RateLimiter: flags.DefaultControllerRateLimiter(c.RateLimiterOptions), + RateLimiter: lifted.DefaultControllerRateLimiter(c.RateLimiterOptions), MaxConcurrentReconciles: 2, }). Complete(c) diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go index b7b213c63..7a8e1bf63 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go @@ -32,6 +32,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/helper" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) const LeafServiceImportControllerName = "leaf-service-import-controller" @@ -44,7 +45,7 @@ type ServiceImportController struct { IPFamilyType kosmosv1alpha1.IPFamilyType EventRecorder record.EventRecorder Logger logr.Logger - processor utils.AsyncWorker + processor lifted.AsyncWorker RootResourceManager *utils.ResourceManager // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources ReservedNamespaces []string @@ -64,15 +65,15 @@ func (c *ServiceImportController) Start(ctx context.Context) error { klog.Infof("Starting %s", LeafServiceImportControllerName) defer klog.Infof("Stop %s as process done.", LeafServiceImportControllerName) - opt := utils.Options{ + opt := lifted.WorkerOptions{ Name: LeafServiceImportControllerName, - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { // Don't care about the GVK in the queue return keys.NamespaceWideKeyFunc(obj) }, ReconcileFunc: c.Reconcile, } - c.processor = utils.NewAsyncWorker(opt) + c.processor = lifted.NewAsyncWorker(opt) serviceImportInformerFactory := externalversions.NewSharedInformerFactory(c.LeafKosmosClient, c.SyncPeriod) serviceImportInformer := serviceImportInformerFactory.Multicluster().V1alpha1().ServiceImports() @@ -105,7 +106,7 @@ func (c *ServiceImportController) Start(ctx context.Context) error { return nil } -func (c *ServiceImportController) Reconcile(key utils.QueueKey) error { +func (c *ServiceImportController) Reconcile(key lifted.QueueKey) error { clusterWideKey, ok := key.(keys.ClusterWideKey) if !ok { klog.Error("invalid key") diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index 306323b68..eaf33cfd0 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -33,6 +33,7 @@ import ( leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/convertpolicy" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) @@ -852,7 +853,7 @@ func (r *RootPodReconciler) mutatePod(ctx context.Context, pod *corev1.Pod, node } func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error { - if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil { + if err := lifted.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil { // span.SetStatus(err) return err } diff --git a/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_controller.go b/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_controller.go index 539d64bd0..cde09087d 100644 --- a/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_controller.go +++ b/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_controller.go @@ -21,9 +21,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" kosmosinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1" kosmoslister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) var ControllerKind = kosmosv1alpha1.SchemeGroupVersion.WithKind("DaemonSet") @@ -50,9 +49,9 @@ type DaemonSetsController struct { clusterSynced cache.InformerSynced - processor utils.AsyncWorker + processor lifted.AsyncWorker - rateLimiterOptions flags.Options + rateLimiterOptions lifted.RateLimitOptions } // NewDaemonSetsController returns a new DaemonSetsController @@ -62,7 +61,7 @@ func NewDaemonSetsController( clusterInformer kosmosinformer.ClusterInformer, kubeClient clientset.Interface, kosmosClient versioned.Interface, - rateLimiterOptions flags.Options, + rateLimiterOptions lifted.RateLimitOptions, ) *DaemonSetsController { err := kosmosv1alpha1.Install(scheme.Scheme) if err != nil { @@ -125,15 +124,15 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) { klog.Infof("Starting daemon set controller") defer klog.Infof("Shutting down daemon set controller") - opt := utils.Options{ + opt := lifted.WorkerOptions{ Name: "daemon set controller", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: dsc.syncDaemonSet, RateLimiterOptions: dsc.rateLimiterOptions, } - dsc.processor = utils.NewAsyncWorker(opt) + dsc.processor = lifted.NewAsyncWorker(opt) if !cache.WaitForNamedCacheSync("kosmos_daemonset_controller", ctx.Done(), dsc.daemonSetSynced, dsc.shadowDaemonSetSynced, dsc.clusterSynced) { klog.Errorf("Timed out waiting for caches to sync") @@ -217,7 +216,7 @@ func (dsc *DaemonSetsController) deleteKNode(obj interface{}) { dsc.processCluster(cluster) } -func (dsc *DaemonSetsController) syncDaemonSet(key utils.QueueKey) error { +func (dsc *DaemonSetsController) syncDaemonSet(key lifted.QueueKey) error { clusterWideKey, ok := key.(keys.ClusterWideKey) if !ok { klog.Errorf("invalid key type %T", key) diff --git a/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_mirror_controller.go b/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_mirror_controller.go index 4943fb7f0..a4c6b8505 100644 --- a/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_mirror_controller.go +++ b/pkg/clustertree/cluster-manager/extensions/daemonset/daemonset_mirror_controller.go @@ -21,9 +21,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" kosmosinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1" kosmoslister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) type DaemonSetsMirrorController struct { @@ -43,9 +42,9 @@ type DaemonSetsMirrorController struct { kosmosDaemonSetSynced cache.InformerSynced - processor utils.AsyncWorker + processor lifted.AsyncWorker - rateLimiterOptions flags.Options + rateLimiterOptions lifted.RateLimitOptions } func NewDaemonSetsMirrorController( @@ -53,7 +52,7 @@ func NewDaemonSetsMirrorController( kubeClient clientset.Interface, kdsInformer kosmosinformer.DaemonSetInformer, dsInformer appsinformers.DaemonSetInformer, - rateLimiterOptions flags.Options, + rateLimiterOptions lifted.RateLimitOptions, ) *DaemonSetsMirrorController { err := kosmosv1alpha1.Install(scheme.Scheme) if err != nil { @@ -107,15 +106,15 @@ func (dmc *DaemonSetsMirrorController) Run(ctx context.Context, workers int) { klog.Infof("starting daemon set mirror controller") defer klog.Infof("shutting down daemon set mirror controller") - opt := utils.Options{ + opt := lifted.WorkerOptions{ Name: "distribute controller: KNode", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: dmc.syncDaemonSet, RateLimiterOptions: dmc.rateLimiterOptions, } - dmc.processor = utils.NewAsyncWorker(opt) + dmc.processor = lifted.NewAsyncWorker(opt) if !cache.WaitForNamedCacheSync("daemon set mirror controller", ctx.Done(), dmc.daemonSetSynced, dmc.kosmosDaemonSetSynced) { @@ -125,7 +124,7 @@ func (dmc *DaemonSetsMirrorController) Run(ctx context.Context, workers int) { dmc.processor.Run(workers, ctx.Done()) } -func (dmc *DaemonSetsMirrorController) syncDaemonSet(key utils.QueueKey) error { +func (dmc *DaemonSetsMirrorController) syncDaemonSet(key lifted.QueueKey) error { clusterWideKey, ok := key.(keys.ClusterWideKey) if !ok { klog.Errorf("invalid key type %T", key) diff --git a/pkg/clustertree/cluster-manager/extensions/daemonset/distribute_controller.go b/pkg/clustertree/cluster-manager/extensions/daemonset/distribute_controller.go index e85581b0c..7a492dfe9 100644 --- a/pkg/clustertree/cluster-manager/extensions/daemonset/distribute_controller.go +++ b/pkg/clustertree/cluster-manager/extensions/daemonset/distribute_controller.go @@ -27,9 +27,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" kosmosinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1" kosmoslister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) // DistributeController is responsible for propagating the shadow daemon set to the member cluster @@ -44,13 +43,13 @@ type DistributeController struct { clusterSynced cache.InformerSynced - clusterProcessor utils.AsyncWorker + clusterProcessor lifted.AsyncWorker - shadowDaemonSetProcessor utils.AsyncWorker + shadowDaemonSetProcessor lifted.AsyncWorker clusterDaemonSetManagerMap map[string]*clusterDaemonSetManager - rateLimiterOptions flags.Options + rateLimiterOptions lifted.RateLimitOptions lock sync.RWMutex } @@ -59,7 +58,7 @@ func NewDistributeController( kosmosClient versioned.Interface, sdsInformer kosmosinformer.ShadowDaemonSetInformer, clusterInformer kosmosinformer.ClusterInformer, - rateLimiterOptions flags.Options, + rateLimiterOptions lifted.RateLimitOptions, ) *DistributeController { dc := &DistributeController{ kosmosClient: kosmosClient, @@ -98,25 +97,25 @@ func (dc *DistributeController) Run(ctx context.Context, workers int) { klog.Infof("Starting distribute controller") defer klog.Infof("Shutting down distribute controller") - clusterOpt := utils.Options{ + clusterOpt := lifted.WorkerOptions{ Name: "distribute controller: cluster", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: dc.syncCluster, RateLimiterOptions: dc.rateLimiterOptions, } - dc.clusterProcessor = utils.NewAsyncWorker(clusterOpt) + dc.clusterProcessor = lifted.NewAsyncWorker(clusterOpt) - sdsOpt := utils.Options{ + sdsOpt := lifted.WorkerOptions{ Name: "distribute controller: ShadowDaemonSet", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: dc.syncShadowDaemonSet, RateLimiterOptions: dc.rateLimiterOptions, } - dc.shadowDaemonSetProcessor = utils.NewAsyncWorker(sdsOpt) + dc.shadowDaemonSetProcessor = lifted.NewAsyncWorker(sdsOpt) if !cache.WaitForNamedCacheSync("host_daemon_controller", ctx.Done(), dc.shadowDaemonSetSynced, dc.clusterSynced) { klog.V(2).Infof("Timed out waiting for caches to sync") @@ -127,7 +126,7 @@ func (dc *DistributeController) Run(ctx context.Context, workers int) { dc.shadowDaemonSetProcessor.Run(workers, ctx.Done()) } -func (dc *DistributeController) syncCluster(key utils.QueueKey) error { +func (dc *DistributeController) syncCluster(key lifted.QueueKey) error { dc.lock.Lock() defer dc.lock.Unlock() clusterWideKey, ok := key.(keys.ClusterWideKey) @@ -218,7 +217,7 @@ func (dc *DistributeController) syncCluster(key utils.QueueKey) error { return dc.ensureClusterFinalizer(cluster) } -func (dc *DistributeController) syncShadowDaemonSet(key utils.QueueKey) error { +func (dc *DistributeController) syncShadowDaemonSet(key lifted.QueueKey) error { dc.lock.RLock() defer dc.lock.RUnlock() clusterWideKey, ok := key.(keys.ClusterWideKey) diff --git a/pkg/clustertree/cluster-manager/extensions/daemonset/pod_reflect_controller.go b/pkg/clustertree/cluster-manager/extensions/daemonset/pod_reflect_controller.go index f4a31d83a..213d313b4 100644 --- a/pkg/clustertree/cluster-manager/extensions/daemonset/pod_reflect_controller.go +++ b/pkg/clustertree/cluster-manager/extensions/daemonset/pod_reflect_controller.go @@ -25,9 +25,8 @@ import ( kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" kosmosinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1" kosmoslister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/utils" - "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) var KosmosDaemonSetKind = kosmosv1alpha1.SchemeGroupVersion.WithKind("DaemonSet") @@ -56,11 +55,11 @@ type PodReflectorController struct { podSynced cache.InformerSynced - clusterProcessor utils.AsyncWorker + clusterProcessor lifted.AsyncWorker - podProcessor utils.AsyncWorker + podProcessor lifted.AsyncWorker - rateLimiterOptions flags.Options + rateLimiterOptions lifted.RateLimitOptions lock sync.RWMutex } @@ -70,7 +69,7 @@ func NewPodReflectorController(kubeClient clientset.Interface, kdsInformer kosmosinformer.DaemonSetInformer, clusterInformer kosmosinformer.ClusterInformer, podInformer corev1informers.PodInformer, - rateLimiterOptions flags.Options, + rateLimiterOptions lifted.RateLimitOptions, ) *PodReflectorController { pc := &PodReflectorController{ kubeClient: kubeClient, @@ -114,19 +113,19 @@ func (pc *PodReflectorController) Run(ctx context.Context, workers int) { klog.Infof("Starting pod reflector controller") defer klog.Infof("Shutting down pod reflector controller") - clusterOpt := utils.Options{ + clusterOpt := lifted.WorkerOptions{ Name: "pod reflector controller: cluster", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: pc.syncCluster, RateLimiterOptions: pc.rateLimiterOptions, } - pc.clusterProcessor = utils.NewAsyncWorker(clusterOpt) + pc.clusterProcessor = lifted.NewAsyncWorker(clusterOpt) - podOpt := utils.Options{ + podOpt := lifted.WorkerOptions{ Name: "pod reflector controller: pod", - KeyFunc: func(obj interface{}) (utils.QueueKey, error) { + KeyFunc: func(obj interface{}) (lifted.QueueKey, error) { pod := obj.(*corev1.Pod) cluster := getCluster(pod) if len(cluster) == 0 { @@ -137,7 +136,7 @@ func (pc *PodReflectorController) Run(ctx context.Context, workers int) { ReconcileFunc: pc.syncPod, RateLimiterOptions: pc.rateLimiterOptions, } - pc.podProcessor = utils.NewAsyncWorker(podOpt) + pc.podProcessor = lifted.NewAsyncWorker(podOpt) if !cache.WaitForNamedCacheSync("pod_reflector_controller", ctx.Done(), pc.daemonsetSynced, pc.kdaemonsetSynced, pc.podSynced, pc.clusterSynced) { klog.Errorf("Timed out waiting for caches to sync") return @@ -150,7 +149,7 @@ func getCluster(pod *corev1.Pod) string { return pod.Annotations[ClusterAnnotationKey] } -func (pc *PodReflectorController) syncCluster(key utils.QueueKey) error { +func (pc *PodReflectorController) syncCluster(key lifted.QueueKey) error { pc.lock.Lock() defer pc.lock.Unlock() clusterWideKey, exist := key.(keys.ClusterWideKey) @@ -212,7 +211,7 @@ func (pc *PodReflectorController) syncCluster(key utils.QueueKey) error { return nil } -func (pc *PodReflectorController) syncPod(key utils.QueueKey) error { +func (pc *PodReflectorController) syncPod(key lifted.QueueKey) error { pc.lock.RLock() defer pc.lock.RUnlock() fedKey, ok := key.(keys.FederatedKey) diff --git a/pkg/clustertree/cluster-manager/node-server/api/exec.go b/pkg/clustertree/cluster-manager/node-server/api/exec.go index 295772914..c5bfdfd42 100644 --- a/pkg/clustertree/cluster-manager/node-server/api/exec.go +++ b/pkg/clustertree/cluster-manager/node-server/api/exec.go @@ -19,6 +19,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/node-server/api/remotecommand" ) +// todo move this file to pkg/utils/lifted + type execIO struct { tty bool stdin io.Reader diff --git a/pkg/utils/podutils/env.go b/pkg/utils/lifted/env.go similarity index 99% rename from pkg/utils/podutils/env.go rename to pkg/utils/lifted/env.go index 58a42ab4d..a0999459b 100644 --- a/pkg/utils/podutils/env.go +++ b/pkg/utils/lifted/env.go @@ -2,7 +2,7 @@ // For reference: // https://github.com/virtual-kubelet/virtual-kubelet/blob/master/internal/podutils/env.go -package podutils +package lifted import ( "context" diff --git a/pkg/utils/podutils/expand.go b/pkg/utils/lifted/expand.go similarity index 99% rename from pkg/utils/podutils/expand.go rename to pkg/utils/lifted/expand.go index 5f9bab3ed..9f9d621b6 100644 --- a/pkg/utils/podutils/expand.go +++ b/pkg/utils/lifted/expand.go @@ -3,7 +3,7 @@ // //This is to eliminate a direct dependency on kubernetes/kubernetes. -package podutils +package lifted import ( "bytes" diff --git a/pkg/utils/flags/ratelimiterflag.go b/pkg/utils/lifted/ratelimiterflag.go similarity index 89% rename from pkg/utils/flags/ratelimiterflag.go rename to pkg/utils/lifted/ratelimiterflag.go index 2ede36042..8ca6b74a6 100755 --- a/pkg/utils/flags/ratelimiterflag.go +++ b/pkg/utils/lifted/ratelimiterflag.go @@ -1,4 +1,4 @@ -package flags +package lifted import ( "time" @@ -12,8 +12,8 @@ import ( // For reference: // https://github.com/karmada-io/karmada/blob/release-1.5/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go -// Options are options for rate limiter. -type Options struct { +// RateLimitOptions are options for rate limiter. +type RateLimitOptions struct { // RateLimiterBaseDelay is the base delay for ItemExponentialFailureRateLimiter. RateLimiterBaseDelay time.Duration @@ -28,7 +28,7 @@ type Options struct { } // AddFlags adds flags to the specified FlagSet. -func (o *Options) AddFlags(fs *pflag.FlagSet) { +func (o *RateLimitOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.RateLimiterBaseDelay, "rate-limiter-base-delay", 5*time.Millisecond, "The base delay for rate limiter.") fs.DurationVar(&o.RateLimiterMaxDelay, "rate-limiter-max-delay", 1000*time.Second, "The max delay for rate limiter.") fs.IntVar(&o.RateLimiterQPS, "rate-limiter-qps", 10, "The QPS for rate limier.") @@ -36,7 +36,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { } // DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags. -func DefaultControllerRateLimiter(opts Options) workqueue.RateLimiter { +func DefaultControllerRateLimiter(opts RateLimitOptions) workqueue.RateLimiter { // set defaults if opts.RateLimiterBaseDelay <= 0 { opts.RateLimiterBaseDelay = 5 * time.Millisecond diff --git a/pkg/utils/worker.go b/pkg/utils/lifted/worker.go similarity index 94% rename from pkg/utils/worker.go rename to pkg/utils/lifted/worker.go index efdf57e2d..601767bf0 100755 --- a/pkg/utils/worker.go +++ b/pkg/utils/lifted/worker.go @@ -1,4 +1,4 @@ -package utils +package lifted import ( "time" @@ -8,8 +8,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - - "github.com/kosmos.io/kosmos/pkg/utils/flags" ) // This code is directly lifted from the karmada @@ -69,21 +67,21 @@ type asyncWorker struct { } // Options are the arguments for creating a new AsyncWorker. -type Options struct { +type WorkerOptions struct { // Name is the queue's name that will be used to emit metrics. // Defaults to "", which means disable metrics. Name string KeyFunc KeyFunc ReconcileFunc ReconcileFunc - RateLimiterOptions flags.Options + RateLimiterOptions RateLimitOptions } // NewAsyncWorker returns a asyncWorker which can process resource periodic. -func NewAsyncWorker(opt Options) AsyncWorker { +func NewAsyncWorker(opt WorkerOptions) AsyncWorker { return &asyncWorker{ keyFunc: opt.KeyFunc, reconcileFunc: opt.ReconcileFunc, - queue: workqueue.NewNamedRateLimitingQueue(flags.DefaultControllerRateLimiter(opt.RateLimiterOptions), opt.Name), + queue: workqueue.NewNamedRateLimitingQueue(DefaultControllerRateLimiter(opt.RateLimiterOptions), opt.Name), } } diff --git a/pkg/utils/manager/resource.go b/pkg/utils/manager/resource.go deleted file mode 100644 index 17dd6851c..000000000 --- a/pkg/utils/manager/resource.go +++ /dev/null @@ -1,46 +0,0 @@ -package manager - -import ( - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - corev1listers "k8s.io/client-go/listers/core/v1" - "k8s.io/klog/v2" -) - -type ResourceManager struct { - podLister corev1listers.PodLister - secretLister corev1listers.SecretLister - configMapLister corev1listers.ConfigMapLister - serviceLister corev1listers.ServiceLister -} - -func NewResourceManager(podLister corev1listers.PodLister, secretLister corev1listers.SecretLister, configMapLister corev1listers.ConfigMapLister, serviceLister corev1listers.ServiceLister) (*ResourceManager, error) { - rm := ResourceManager{ - podLister: podLister, - secretLister: secretLister, - configMapLister: configMapLister, - serviceLister: serviceLister, - } - return &rm, nil -} - -func (rm *ResourceManager) GetPods() []*v1.Pod { - l, err := rm.podLister.List(labels.Everything()) - if err == nil { - return l - } - klog.Errorf("failed to fetch pods from lister: %v", err) - return make([]*v1.Pod, 0) -} - -func (rm *ResourceManager) GetConfigMap(name, namespace string) (*v1.ConfigMap, error) { - return rm.configMapLister.ConfigMaps(namespace).Get(name) -} - -func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error) { - return rm.secretLister.Secrets(namespace).Get(name) -} - -func (rm *ResourceManager) ListServices() ([]*v1.Service, error) { - return rm.serviceLister.List(labels.Everything()) -}