diff --git a/cmd/clusterlink/agent/app/agent.go b/cmd/clusterlink/agent/app/agent.go index 6f8d5132a..3d7d0cf83 100644 --- a/cmd/clusterlink/agent/app/agent.go +++ b/cmd/clusterlink/agent/app/agent.go @@ -70,10 +70,11 @@ func NewAgentCommand(ctx context.Context) *cobra.Command { } func run(ctx context.Context, opts *options.Options) error { - restConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfig) + restConfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %s", err.Error()) } + utils.SetQPSBurst(restConfig, opts.KubernetesOptions) if err = network.CreateGlobalNetIptablesChains(); err != nil { return fmt.Errorf("failed to create clusterlink iptables chains: %s", err.Error()) diff --git a/cmd/clusterlink/agent/app/options/options.go b/cmd/clusterlink/agent/app/options/options.go index 16916421f..f07df7121 100644 --- a/cmd/clusterlink/agent/app/options/options.go +++ b/cmd/clusterlink/agent/app/options/options.go @@ -4,10 +4,12 @@ import ( "time" "github.com/spf13/pflag" + + "github.com/kosmos.io/kosmos/pkg/utils" ) type Options struct { - KubeConfig string + KubernetesOptions utils.KubernetesOptions // CleanPeriod represents clusterlink-agent cleanup period CleanPeriod time.Duration @@ -15,9 +17,7 @@ type Options struct { // NewOptions builds a default agent options. func NewOptions() *Options { - return &Options{ - KubeConfig: "", - } + return &Options{} } // AddFlags adds flags of agent to the specified FlagSet @@ -26,6 +26,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { return } - fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to control plane kubeconfig file.") + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + fs.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path to control plane kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.MasterUrl, "master-url", "", "Used to generate kubeconfig for downloading, if not specified, will use host in control plane kubeconfig.") fs.DurationVar(&o.CleanPeriod, "clean-period", 30*time.Second, "Specifies how often the agent cleans up routes and network interface.") } diff --git a/cmd/clusterlink/clusterlink-operator/app/operator.go b/cmd/clusterlink/clusterlink-operator/app/operator.go index 721f02bf4..57739a7e0 100644 --- a/cmd/clusterlink/clusterlink-operator/app/operator.go +++ b/cmd/clusterlink/clusterlink-operator/app/operator.go @@ -68,18 +68,20 @@ func NewLinkOperatorCommand(ctx context.Context) *cobra.Command { } func Run(ctx context.Context, opts *options.Options) error { - restConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfig) + restConfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %s", err.Error()) } + utils.SetQPSBurst(restConfig, opts.KubernetesOptions) + clientSet, err := kubernetes.NewForConfig(restConfig) if err != nil { return fmt.Errorf("error get kubeclient: %v", err) } var controlPanelKubeConfig *clientcmdapi.Config - if len(opts.ControlPanelKubeConfig) > 0 { - controlPanelKubeConfig, err = clientcmd.LoadFromFile(opts.ControlPanelKubeConfig) + if len(opts.KubernetesOptions.ControlpanelKubeConfig) > 0 { + controlPanelKubeConfig, err = clientcmd.LoadFromFile(opts.KubernetesOptions.ControlpanelKubeConfig) if err != nil { return fmt.Errorf("failed to load controlpanelKubeConfig: %v", err) } @@ -100,10 +102,11 @@ func Run(ctx context.Context, opts *options.Options) error { controlPanelKubeConfig = config } - c, err := clientcmd.BuildConfigFromFlags("", opts.ControlPanelKubeConfig) + c, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.ControlpanelMasterUrl, opts.KubernetesOptions.ControlpanelKubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %s", err.Error()) } + utils.SetQPSBurst(c, opts.KubernetesOptions) mgr, err := ctrl.NewManager(c, ctrl.Options{ Scheme: scheme.NewSchema(), }) diff --git a/cmd/clusterlink/clusterlink-operator/app/options/options.go b/cmd/clusterlink/clusterlink-operator/app/options/options.go index ae352643e..c56261f0a 100644 --- a/cmd/clusterlink/clusterlink-operator/app/options/options.go +++ b/cmd/clusterlink/clusterlink-operator/app/options/options.go @@ -2,20 +2,19 @@ package options import ( "github.com/spf13/pflag" + + "github.com/kosmos.io/kosmos/pkg/utils" ) type Options struct { - KubeConfig string - ControlPanelKubeConfig string + KubernetesOptions utils.KubernetesOptions ExternalKubeConfigName string UseProxy bool } // NewOptions builds a default agent options. func NewOptions() *Options { - return &Options{ - KubeConfig: "", - } + return &Options{} } // AddFlags adds flags of estimator to the specified FlagSet @@ -23,8 +22,13 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { if o == nil { return } - fs.StringVar(&o.KubeConfig, "kubeconfig", "", "Path to control plane kubeconfig file.") - fs.StringVar(&o.ControlPanelKubeConfig, "controlpanelconfig", "", "Path to host control plane kubeconfig file.") + + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + fs.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path to host kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.MasterUrl, "master-url", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") + fs.StringVar(&o.KubernetesOptions.ControlpanelKubeConfig, "controlpanel-kubeconfig", "", "Path to control plane kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.ControlpanelMasterUrl, "controlpanel-master-url", "", "Used to generate host control plane kubeconfig for downloading, if not specified, will use host in controlpanel-kubeconfig.") fs.StringVar(&o.ExternalKubeConfigName, "ExternalKubeConfigName", "external-kubeconfig", "external kube config name.") fs.BoolVar(&o.UseProxy, "UseProxy", false, "external kube config name.") } diff --git a/cmd/clusterlink/controller-manager/app/controller-manager.go b/cmd/clusterlink/controller-manager/app/controller-manager.go index b01a602fd..de26d2af9 100644 --- a/cmd/clusterlink/controller-manager/app/controller-manager.go +++ b/cmd/clusterlink/controller-manager/app/controller-manager.go @@ -17,6 +17,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" + "github.com/kosmos.io/kosmos/pkg/utils" ) var ( @@ -80,6 +81,8 @@ func Run(ctx context.Context, opts *options.ControllerManagerOptions) error { panic(err) } + utils.SetQPSBurst(config, opts.KubernetesOptions) + controllerManager, err := ctrl.NewManager(config, ctrl.Options{ Scheme: scheme.NewSchema(), }) @@ -89,11 +92,12 @@ func Run(ctx context.Context, opts *options.ControllerManagerOptions) error { } //TODO 整理这块 - controlPanelConfig, err := clientcmd.BuildConfigFromFlags("", opts.ControlPanelConfig) + controlPanelConfig, err := clientcmd.BuildConfigFromFlags(opts.ControlpanelMasterUrl, opts.ControlpanelKubeConfig) if err != nil { klog.Fatalf("build controlpanel config err: %v", err) panic(err) } + utils.SetQPSBurst(controlPanelConfig, opts.KubernetesOptions) clusterLinkClient, err := versioned.NewForConfig(controlPanelConfig) if err != nil { @@ -111,11 +115,12 @@ func Run(ctx context.Context, opts *options.ControllerManagerOptions) error { } func setupControllers(ctx context.Context, mgr ctrl.Manager, opts *options.ControllerManagerOptions) []ctrlcontext.CleanFunc { - controlPanelConfig, err := clientcmd.BuildConfigFromFlags("", opts.ControlPanelConfig) + controlPanelConfig, err := clientcmd.BuildConfigFromFlags(opts.ControlpanelMasterUrl, opts.ControlpanelKubeConfig) if err != nil { klog.Fatalf("build controlpanel config err: %v", err) panic(err) } + utils.SetQPSBurst(controlPanelConfig, opts.KubernetesOptions) clusterLinkClient, err := versioned.NewForConfig(controlPanelConfig) if err != nil { diff --git a/cmd/clusterlink/controller-manager/app/options/options.go b/cmd/clusterlink/controller-manager/app/options/options.go index 113c8aa4a..221da9deb 100644 --- a/cmd/clusterlink/controller-manager/app/options/options.go +++ b/cmd/clusterlink/controller-manager/app/options/options.go @@ -17,11 +17,9 @@ type ControllerManagerOptions struct { RateLimiterOpts lifted.RateLimitOptions - ControlPanelConfig string - - KubeConfig string - ClusterName string + + utils.KubernetesOptions } // NewControllerManagerOptions builds a default controller manager options. @@ -36,10 +34,16 @@ func (o *ControllerManagerOptions) Validate() field.ErrorList { } func (o *ControllerManagerOptions) AddFlags(fs *pflag.FlagSet, allControllers, disabledByDefaultControllers []string) { + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + fs.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path to host kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.MasterUrl, "master-url", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") + fs.StringVar(&o.KubernetesOptions.ControlpanelKubeConfig, "controlpanel-kubeconfig", "", "Path to control plane kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.ControlpanelMasterUrl, "controlpanel-master-url", "", "Used to generate host control plane kubeconfig for downloading, if not specified, will use host in control panel kubeconfig.") + fs.StringSliceVar(&o.Controllers, "controllers", []string{"*"}, fmt.Sprintf( "A list of controllers to enable. '*' enables all on-by-default controllers, 'foo' enables the controller named 'foo', '-foo' disables the controller named 'foo'. \nAll controllers: %s.\nDisabled-by-default controllers: %s", strings.Join(allControllers, ", "), strings.Join(disabledByDefaultControllers, ", "), )) fs.StringVar(&o.ClusterName, "cluster", os.Getenv(utils.EnvClusterName), "current cluster name.") - fs.StringVar(&o.ControlPanelConfig, "controlpanelconfig", "", "path to controlpanel kubeconfig file.") } diff --git a/cmd/clusterlink/elector/app/elector.go b/cmd/clusterlink/elector/app/elector.go index 670dc2b03..35913abb5 100644 --- a/cmd/clusterlink/elector/app/elector.go +++ b/cmd/clusterlink/elector/app/elector.go @@ -22,6 +22,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/sharedcli" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" + "github.com/kosmos.io/kosmos/pkg/utils" ) // NewElectorCommand creates a *cobra.Command object with default parameters @@ -71,15 +72,18 @@ func NewElectorCommand(ctx context.Context) *cobra.Command { } func run(ctx context.Context, opts *options.Options) error { - memberClusterConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfig) + memberClusterConfig, err := clientcmd.BuildConfigFromFlags(opts.MasterUrl, opts.KubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %+v", err) } + utils.SetQPSBurst(memberClusterConfig, opts.KubernetesOptions) - controlpanelConfig, err := clientcmd.BuildConfigFromFlags("", opts.ControlPanelConfig) + controlpanelConfig, err := clientcmd.BuildConfigFromFlags(opts.ControlpanelMasterUrl, opts.ControlpanelKubeConfig) if err != nil { return fmt.Errorf("error building controlpanelConfig: %+v", err) } + utils.SetQPSBurst(controlpanelConfig, opts.KubernetesOptions) + controlpanelClient, err := versioned.NewForConfig(controlpanelConfig) if err != nil { return fmt.Errorf("error building controlpanelClient: %+v", err) diff --git a/cmd/clusterlink/elector/app/options/options.go b/cmd/clusterlink/elector/app/options/options.go index 472d8e996..d8d3b4ce9 100644 --- a/cmd/clusterlink/elector/app/options/options.go +++ b/cmd/clusterlink/elector/app/options/options.go @@ -19,9 +19,8 @@ var ( ) type Options struct { - LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubeConfig string - ControlPanelConfig string + LeaderElection componentbaseconfig.LeaderElectionConfiguration + utils.KubernetesOptions } // NewOptions builds a default elector options. @@ -39,13 +38,19 @@ func NewOptions() *Options { func (o *Options) Validate() field.ErrorList { errs := field.ErrorList{} newPath := field.NewPath("Options") - if len(o.ControlPanelConfig) == 0 { - errs = append(errs, field.Invalid(newPath.Child("controlpanelconfig"), o.ControlPanelConfig, "controlpanelconfig path should not empty")) + if len(o.KubernetesOptions.ControlpanelKubeConfig) == 0 { + errs = append(errs, field.Invalid(newPath.Child("controlpanelconfig"), o.ControlpanelKubeConfig, "controlpanelconfig path should not empty")) } return errs } func (o *Options) AddFlags(fs *pflag.FlagSet) { + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + fs.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path to host kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.MasterUrl, "master-url", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") + fs.StringVar(&o.KubernetesOptions.ControlpanelKubeConfig, "controlpanel-kubeconfig", "", "Path to control plane kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.ControlpanelMasterUrl, "controlpanel-master-url", "", "Used to generate host control plane kubeconfig for downloading, if not specified, will use host in controlpanel-kubeconfig.") fs.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Enable leader election, which must be true when running multi instances.") fs.StringVar(&o.LeaderElection.ResourceName, "leader-elect-resource-name", "elector", "The name of resource object that is used for locking during leader election.") fs.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", utils.DefaultNamespace, "The namespace of resource object that is used for locking during leader election.") @@ -62,6 +67,4 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.LeaderElection.RetryPeriod.Duration, "leader-elect-retry-period", defaultElectionRetryPeriod.Duration, ""+ "The duration the clients should wait between attempting acquisition and renewal "+ "of a leadership. This is only applicable if leader election is enabled.") - fs.StringVar(&o.KubeConfig, "kubeconfig", "", "path to elector kubeconfig file.") - fs.StringVar(&o.ControlPanelConfig, "controlpanelconfig", "", "path to controlpanel kubeconfig file.") } diff --git a/cmd/clusterlink/floater/app/options/options.go b/cmd/clusterlink/floater/app/options/options.go index c70947421..bd0883dc5 100644 --- a/cmd/clusterlink/floater/app/options/options.go +++ b/cmd/clusterlink/floater/app/options/options.go @@ -9,7 +9,7 @@ import ( type Options struct { KubeConfig string - // CleanPeriod represents clusterlink-agent cleanup period + // CleanPeriod represents clusterlink-floater cleanup period CleanPeriod time.Duration } diff --git a/cmd/clusterlink/network-manager/app/manager.go b/cmd/clusterlink/network-manager/app/manager.go index ef46bba25..a22126c21 100644 --- a/cmd/clusterlink/network-manager/app/manager.go +++ b/cmd/clusterlink/network-manager/app/manager.go @@ -14,6 +14,7 @@ import ( networkmanager "github.com/kosmos.io/kosmos/pkg/clusterlink/network-manager" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" + "github.com/kosmos.io/kosmos/pkg/utils" ) func NewNetworkManagerCommand(ctx context.Context) *cobra.Command { @@ -45,11 +46,11 @@ func NewNetworkManagerCommand(ctx context.Context) *cobra.Command { } func run(ctx context.Context, opts *options.Options) error { - config, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) + config, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { panic(err) } - config.QPS, config.Burst = opts.KubernetesOptions.QPS, opts.KubernetesOptions.Burst + utils.SetQPSBurst(config, opts.KubernetesOptions) mgr, err := controllerruntime.NewManager(config, controllerruntime.Options{ Logger: klog.Background(), diff --git a/cmd/clusterlink/network-manager/app/options/options.go b/cmd/clusterlink/network-manager/app/options/options.go index 4c398cb1d..8d39dd7f4 100644 --- a/cmd/clusterlink/network-manager/app/options/options.go +++ b/cmd/clusterlink/network-manager/app/options/options.go @@ -9,15 +9,8 @@ import ( ) type Options struct { - LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubernetesOptions KubernetesOptions -} - -type KubernetesOptions struct { - KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"` - Master string `json:"master,omitempty" yaml:"master,omitempty"` - QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"` - Burst int `json:"burst,omitempty" yaml:"burst,omitempty"` + LeaderElection componentbaseconfig.LeaderElectionConfiguration + utils.KubernetesOptions } func NewOptions() *Options { @@ -39,8 +32,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") flags.StringVar(&o.LeaderElection.ResourceName, "leader-elect-resource-name", "network-manager", "The name of resource object that is used for locking during leader election.") flags.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", utils.DefaultNamespace, "The namespace of resource object that is used for locking during leader election.") - flags.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", 40.0, "QPS to use while talking with kube-apiserver.") - flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", 60, "Burst to use while talking with kube-apiserver.") - flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.") - flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") + flags.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultTreeAndNetManagerKubeQPS, "QPS to use while talking with kube-apiserver.") + flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultTreeAndNetManagerKubeBurst, "Burst to use while talking with kube-apiserver.") + flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path to host kubeconfig file.") + flags.StringVar(&o.KubernetesOptions.MasterUrl, "master-url", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") } diff --git a/cmd/clustertree/cluster-manager/app/extersion_apps.go b/cmd/clustertree/cluster-manager/app/extersion_apps.go index cab4ebfbd..1fdfcd1de 100644 --- a/cmd/clustertree/cluster-manager/app/extersion_apps.go +++ b/cmd/clustertree/cluster-manager/app/extersion_apps.go @@ -15,15 +15,18 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" + "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) // StartHostDaemonSetsController starts a new HostDaemonSetsController. func StartHostDaemonSetsController(ctx context.Context, opts *options.Options, workNum int) (*daemonset.HostDaemonSetsController, error) { - kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) + kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) @@ -56,11 +59,13 @@ func StartHostDaemonSetsController(ctx context.Context, opts *options.Options, w } func StartDistributeController(ctx context.Context, opts *options.Options, workNum int) (*daemonset.DistributeController, error) { - kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) + kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { //klog.Errorf("Unable to build kubeconfig: %v", err) klog.Errorf("Unable to build kubeconfig: %v", err) } + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kosmosClient, err := versioned.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kosmosClient: %v", err) @@ -82,10 +87,12 @@ func StartDistributeController(ctx context.Context, opts *options.Options, workN } func StartDaemonSetsController(ctx context.Context, opts *options.Options, workNum int) (*daemonset.DaemonSetsController, error) { - kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) + kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) @@ -115,10 +122,12 @@ func StartDaemonSetsController(ctx context.Context, opts *options.Options, workN } func StartDaemonSetsMirrorController(ctx context.Context, opts *options.Options, workNum int) (*daemonset.DaemonSetsMirrorController, error) { - kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) + kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) @@ -146,10 +155,12 @@ func StartDaemonSetsMirrorController(ctx context.Context, opts *options.Options, } func StartPodReflectController(ctx context.Context, opts *options.Options, workNum int) (*daemonset.PodReflectorController, error) { - kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) + kubeconfig, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index 0e7f4651f..5e5cc57c2 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -68,10 +68,11 @@ func leaderElectionRun(ctx context.Context, opts *options.Options) error { return run(ctx, opts) } - kubeConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubernetesOptions.KubeConfig) + kubeConfig, err := clientcmd.BuildConfigFromFlags(opts.MasterUrl, opts.KubeConfig) if err != nil { return err } + utils.SetQPSBurst(kubeConfig, opts.KubernetesOptions) id, err := os.Hostname() if err != nil { @@ -119,7 +120,7 @@ func run(ctx context.Context, opts *options.Options) error { globalLeafResourceManager := leafUtils.GetGlobalLeafResourceManager() globalLeafClientManager := leafUtils.GetGlobalLeafClientResourceManager() - config, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) + config, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.MasterUrl, opts.KubernetesOptions.KubeConfig) if err != nil { panic(err) } diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index 653463789..1f4d76585 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -9,6 +9,7 @@ import ( "k8s.io/component-base/config/options" componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) @@ -17,16 +18,13 @@ const ( LeaderElectionNamespace = "kosmos-system" LeaderElectionResourceName = "cluster-manager" - DefaultKubeQPS = 40.0 - DefaultKubeBurst = 60 - CoreDNSServiceNamespace = "kube-system" CoreDNSServiceName = "kube-dns" ) type Options struct { - LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubernetesOptions KubernetesOptions + LeaderElection componentbaseconfig.LeaderElectionConfiguration + utils.KubernetesOptions ListenPort int32 DaemonSetController bool MultiClusterService bool @@ -54,13 +52,6 @@ type Options struct { SyncPeriod time.Duration } -type KubernetesOptions struct { - KubeConfig string - Master string - QPS float32 - Burst int -} - func NewOptions() (*Options, error) { var leaderElection componentbaseconfigv1alpha1.LeaderElectionConfiguration componentbaseconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&leaderElection) @@ -82,10 +73,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { return } - flags.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") - flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + flags.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultTreeAndNetManagerKubeQPS, "QPS to use while talking with kube-apiserver.") + flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultTreeAndNetManagerKubeBurst, "Burst to use while talking with kube-apiserver.") flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.") - flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") + flags.StringVar(&o.KubernetesOptions.MasterUrl, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") flags.Int32Var(&o.ListenPort, "listen-port", 10250, "Listen port for requests from the kube-apiserver.") flags.BoolVar(&o.DaemonSetController, "daemonset-controller", false, "Turn on or off daemonset controller.") flags.BoolVar(&o.MultiClusterService, "multi-cluster-service", false, "Turn on or off mcs support.") diff --git a/pkg/utils/kube_client.go b/pkg/utils/kube_client.go index 65ca71e68..7bb956350 100644 --- a/pkg/utils/kube_client.go +++ b/pkg/utils/kube_client.go @@ -18,6 +18,23 @@ var ( defaultKubeConfig = filepath.Join(homedir.HomeDir(), ".kube", "config") ) +const ( + DefaultKubeQPS = 5.0 + DefaultKubeBurst = 10 + + DefaultTreeAndNetManagerKubeQPS = 40.0 + DefaultTreeAndNetManagerKubeBurst = 60 +) + +type KubernetesOptions struct { + KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"` + MasterUrl string `json:"masterUrl,omitempty" yaml:"masterUrl,omitempty"` + ControlpanelKubeConfig string `json:"controlpanelKubeConfig,omitempty" yaml:"controlpanelKubeConfig,omitempty"` + ControlpanelMasterUrl string `json:"controlpanelMasterUrl,omitempty" yaml:"controlpanelMasterUrl,omitempty"` + QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"` + Burst int `json:"burst,omitempty" yaml:"burst,omitempty"` +} + func loadKubeconfig(kubeconfigPath, context string) (*clientcmdapi.Config, error) { if kubeconfigPath == "" { kubeconfigPath = GetEnvString("KUBECONFIG", defaultKubeConfig) @@ -128,6 +145,11 @@ func NewClientFromConfigPath(configPath string, opts ...Opts) (kubernetes.Interf return client, nil } +func SetQPSBurst(config *rest.Config, options KubernetesOptions) { + config.QPS = options.QPS + config.Burst = options.Burst +} + func NewKosmosClientFromConfigPath(configPath string, opts ...Opts) (kosmosversioned.Interface, error) { var ( config *rest.Config diff --git a/pkg/utils/kube_client_test.go b/pkg/utils/kube_client_test.go new file mode 100644 index 000000000..f50b7c32f --- /dev/null +++ b/pkg/utils/kube_client_test.go @@ -0,0 +1,145 @@ +package utils + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +func TestSetQPSBurst(t *testing.T) { + testCases := []struct { + name string + KubernetesOptions + expectedQPS float32 + expectedBurst int + numRequests int + expectedMinTime float64 + expectedMaxTime float64 + }{ + { + name: "numRequests is 60", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + MasterUrl: "", + QPS: 5.0, + Burst: 10, + }, + numRequests: 60, + expectedQPS: 5.0, + expectedBurst: 10, + expectedMinTime: 6, + expectedMaxTime: 12, + }, + { + name: "numRequests is 600", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + MasterUrl: "", + QPS: 40.0, + Burst: 60, + }, + numRequests: 600, + expectedQPS: 40.0, + expectedBurst: 60, + expectedMinTime: 10, + expectedMaxTime: 15, + }, + { + name: "QPS is 1, Burst is 1", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + MasterUrl: "", + QPS: 1.0, + Burst: 1, + }, + numRequests: 30, + expectedQPS: 1.0, + expectedBurst: 1, + expectedMinTime: 29, + expectedMaxTime: 31, + }, + { + name: "QPS is 0, Burst is 0", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + MasterUrl: "", + QPS: 0.0, + Burst: 0, + }, + numRequests: 60, + expectedQPS: 5.0, + expectedBurst: 10, + expectedMinTime: 6, + expectedMaxTime: 12, + }, + { + name: "not set", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + MasterUrl: "", + }, + numRequests: 60, + expectedQPS: 5.0, + expectedBurst: 10, + expectedMinTime: 6, + expectedMaxTime: 12, + }, + } + + restConfig, err := clientcmd.BuildConfigFromFlags("", "/Users/george/.kube/config") + if err != nil { + panic(err) + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + options := KubernetesOptions{ + QPS: tc.expectedQPS, + Burst: tc.expectedBurst, + } + SetQPSBurst(restConfig, options) + + // create client + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + fmt.Printf("error creating clientset: %s\n", err) + return + } + + ctx := context.Background() + var wg sync.WaitGroup + // simulate concurrent requests + numRequests := tc.numRequests + + start := time.Now() + for i := 0; i < numRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := clientset.CoreV1().Pods("").List(ctx, v1.ListOptions{}) + if err != nil { + fmt.Printf("request failed: %s\n", err) + } + }() + } + + // Wait for all requests to complete + wg.Wait() + elapsed := time.Since(start) + fmt.Printf("All requests completed, Execution time: %s\n", elapsed) + + assert.Equal(t, tc.expectedQPS, restConfig.QPS) + assert.Equal(t, tc.expectedBurst, restConfig.Burst) + seconds := elapsed.Seconds() + assert.Condition(t, func() bool { + return seconds > tc.expectedMinTime && seconds < tc.expectedMaxTime + }, "seconds %f should be greater than %f and less than %f", seconds, tc.expectedMinTime, tc.expectedMaxTime) + }) + } +}