diff --git a/cmd/clusterlink/agent/app/agent.go b/cmd/clusterlink/agent/app/agent.go index 6f8d5132a..a31badcad 100644 --- a/cmd/clusterlink/agent/app/agent.go +++ b/cmd/clusterlink/agent/app/agent.go @@ -70,11 +70,13 @@ func NewAgentCommand(ctx context.Context) *cobra.Command { } func run(ctx context.Context, opts *options.Options) error { - restConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfig) + restConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubernetesOptions.KubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %s", err.Error()) } + utils.SetQPSBurst(restConfig, opts.KubernetesOptions) + if err = network.CreateGlobalNetIptablesChains(); err != nil { return fmt.Errorf("failed to create clusterlink iptables chains: %s", err.Error()) } diff --git a/cmd/clusterlink/agent/app/options/options.go b/cmd/clusterlink/agent/app/options/options.go index 16916421f..d6811a36d 100644 --- a/cmd/clusterlink/agent/app/options/options.go +++ b/cmd/clusterlink/agent/app/options/options.go @@ -4,10 +4,12 @@ import ( "time" "github.com/spf13/pflag" + + "github.com/kosmos.io/kosmos/pkg/utils" ) type Options struct { - KubeConfig string + KubernetesOptions utils.KubernetesOptions // CleanPeriod represents clusterlink-agent cleanup period CleanPeriod time.Duration @@ -15,9 +17,7 @@ type Options struct { // NewOptions builds a default agent options. func NewOptions() *Options { - return &Options{ - KubeConfig: "", - } + return &Options{} } // AddFlags adds flags of agent to the specified FlagSet @@ -26,6 +26,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { return } - fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to control plane kubeconfig file.") + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + fs.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path to control plane kubeconfig file.") fs.DurationVar(&o.CleanPeriod, "clean-period", 30*time.Second, "Specifies how often the agent cleans up routes and network interface.") } diff --git a/cmd/clusterlink/clusterlink-operator/app/operator.go b/cmd/clusterlink/clusterlink-operator/app/operator.go index 721f02bf4..77cc91bac 100644 --- a/cmd/clusterlink/clusterlink-operator/app/operator.go +++ b/cmd/clusterlink/clusterlink-operator/app/operator.go @@ -68,10 +68,13 @@ func NewLinkOperatorCommand(ctx context.Context) *cobra.Command { } func Run(ctx context.Context, opts *options.Options) error { - restConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfig) + restConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubernetesOptions.KubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %s", err.Error()) } + + utils.SetQPSBurst(restConfig, opts.KubernetesOptions) + clientSet, err := kubernetes.NewForConfig(restConfig) if err != nil { return fmt.Errorf("error get kubeclient: %v", err) diff --git a/cmd/clusterlink/clusterlink-operator/app/options/options.go b/cmd/clusterlink/clusterlink-operator/app/options/options.go index ae352643e..d9ab8a072 100644 --- a/cmd/clusterlink/clusterlink-operator/app/options/options.go +++ b/cmd/clusterlink/clusterlink-operator/app/options/options.go @@ -2,10 +2,12 @@ package options import ( "github.com/spf13/pflag" + + "github.com/kosmos.io/kosmos/pkg/utils" ) type Options struct { - KubeConfig string + KubernetesOptions utils.KubernetesOptions ControlPanelKubeConfig string ExternalKubeConfigName string UseProxy bool @@ -13,9 +15,7 @@ type Options struct { // NewOptions builds a default agent options. func NewOptions() *Options { - return &Options{ - KubeConfig: "", - } + return &Options{} } // AddFlags adds flags of estimator to the specified FlagSet @@ -23,7 +23,10 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { if o == nil { return } - fs.StringVar(&o.KubeConfig, "kubeconfig", "", "Path to control plane kubeconfig file.") + + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + fs.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path to control plane kubeconfig file.") fs.StringVar(&o.ControlPanelKubeConfig, "controlpanelconfig", "", "Path to host control plane kubeconfig file.") fs.StringVar(&o.ExternalKubeConfigName, "ExternalKubeConfigName", "external-kubeconfig", "external kube config name.") fs.BoolVar(&o.UseProxy, "UseProxy", false, "external kube config name.") diff --git a/cmd/clusterlink/controller-manager/app/controller-manager.go b/cmd/clusterlink/controller-manager/app/controller-manager.go index b01a602fd..75dc152f2 100644 --- a/cmd/clusterlink/controller-manager/app/controller-manager.go +++ b/cmd/clusterlink/controller-manager/app/controller-manager.go @@ -17,6 +17,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" + "github.com/kosmos.io/kosmos/pkg/utils" ) var ( @@ -80,6 +81,8 @@ func Run(ctx context.Context, opts *options.ControllerManagerOptions) error { panic(err) } + utils.SetQPSBurst(config, opts.KubernetesOptions) + controllerManager, err := ctrl.NewManager(config, ctrl.Options{ Scheme: scheme.NewSchema(), }) @@ -95,6 +98,8 @@ func Run(ctx context.Context, opts *options.ControllerManagerOptions) error { panic(err) } + utils.SetQPSBurst(controlPanelConfig, opts.KubernetesOptions) + clusterLinkClient, err := versioned.NewForConfig(controlPanelConfig) if err != nil { klog.Fatalf("Unable to create clusterlinkClient: %v", err) diff --git a/cmd/clusterlink/controller-manager/app/options/options.go b/cmd/clusterlink/controller-manager/app/options/options.go index 113c8aa4a..6f49ed1e3 100644 --- a/cmd/clusterlink/controller-manager/app/options/options.go +++ b/cmd/clusterlink/controller-manager/app/options/options.go @@ -19,9 +19,9 @@ type ControllerManagerOptions struct { ControlPanelConfig string - KubeConfig string - ClusterName string + + utils.KubernetesOptions } // NewControllerManagerOptions builds a default controller manager options. @@ -36,6 +36,8 @@ func (o *ControllerManagerOptions) Validate() field.ErrorList { } func (o *ControllerManagerOptions) AddFlags(fs *pflag.FlagSet, allControllers, disabledByDefaultControllers []string) { + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") fs.StringSliceVar(&o.Controllers, "controllers", []string{"*"}, fmt.Sprintf( "A list of controllers to enable. '*' enables all on-by-default controllers, 'foo' enables the controller named 'foo', '-foo' disables the controller named 'foo'. \nAll controllers: %s.\nDisabled-by-default controllers: %s", strings.Join(allControllers, ", "), strings.Join(disabledByDefaultControllers, ", "), diff --git a/cmd/clusterlink/elector/app/elector.go b/cmd/clusterlink/elector/app/elector.go index 670dc2b03..9c8a0bce7 100644 --- a/cmd/clusterlink/elector/app/elector.go +++ b/cmd/clusterlink/elector/app/elector.go @@ -22,6 +22,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/sharedcli" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" + "github.com/kosmos.io/kosmos/pkg/utils" ) // NewElectorCommand creates a *cobra.Command object with default parameters @@ -76,10 +77,15 @@ func run(ctx context.Context, opts *options.Options) error { return fmt.Errorf("error building kubeconfig: %+v", err) } + utils.SetQPSBurst(memberClusterConfig, opts.KubernetesOptions) + controlpanelConfig, err := clientcmd.BuildConfigFromFlags("", opts.ControlPanelConfig) if err != nil { return fmt.Errorf("error building controlpanelConfig: %+v", err) } + + utils.SetQPSBurst(controlpanelConfig, opts.KubernetesOptions) + controlpanelClient, err := versioned.NewForConfig(controlpanelConfig) if err != nil { return fmt.Errorf("error building controlpanelClient: %+v", err) diff --git a/cmd/clusterlink/elector/app/options/options.go b/cmd/clusterlink/elector/app/options/options.go index 472d8e996..511aa6bbd 100644 --- a/cmd/clusterlink/elector/app/options/options.go +++ b/cmd/clusterlink/elector/app/options/options.go @@ -20,8 +20,8 @@ var ( type Options struct { LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubeConfig string ControlPanelConfig string + utils.KubernetesOptions } // NewOptions builds a default elector options. @@ -46,6 +46,8 @@ func (o *Options) Validate() field.ErrorList { } func (o *Options) AddFlags(fs *pflag.FlagSet) { + fs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") + fs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") fs.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Enable leader election, which must be true when running multi instances.") fs.StringVar(&o.LeaderElection.ResourceName, "leader-elect-resource-name", "elector", "The name of resource object that is used for locking during leader election.") fs.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", utils.DefaultNamespace, "The namespace of resource object that is used for locking during leader election.") @@ -62,6 +64,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.LeaderElection.RetryPeriod.Duration, "leader-elect-retry-period", defaultElectionRetryPeriod.Duration, ""+ "The duration the clients should wait between attempting acquisition and renewal "+ "of a leadership. This is only applicable if leader election is enabled.") - fs.StringVar(&o.KubeConfig, "kubeconfig", "", "path to elector kubeconfig file.") + fs.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "path to elector kubeconfig file.") fs.StringVar(&o.ControlPanelConfig, "controlpanelconfig", "", "path to controlpanel kubeconfig file.") } diff --git a/cmd/clusterlink/floater/app/options/options.go b/cmd/clusterlink/floater/app/options/options.go index c70947421..bd0883dc5 100644 --- a/cmd/clusterlink/floater/app/options/options.go +++ b/cmd/clusterlink/floater/app/options/options.go @@ -9,7 +9,7 @@ import ( type Options struct { KubeConfig string - // CleanPeriod represents clusterlink-agent cleanup period + // CleanPeriod represents clusterlink-floater cleanup period CleanPeriod time.Duration } diff --git a/cmd/clusterlink/network-manager/app/manager.go b/cmd/clusterlink/network-manager/app/manager.go index ef46bba25..9a9388f1a 100644 --- a/cmd/clusterlink/network-manager/app/manager.go +++ b/cmd/clusterlink/network-manager/app/manager.go @@ -14,6 +14,7 @@ import ( networkmanager "github.com/kosmos.io/kosmos/pkg/clusterlink/network-manager" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" + "github.com/kosmos.io/kosmos/pkg/utils" ) func NewNetworkManagerCommand(ctx context.Context) *cobra.Command { @@ -49,7 +50,8 @@ func run(ctx context.Context, opts *options.Options) error { if err != nil { panic(err) } - config.QPS, config.Burst = opts.KubernetesOptions.QPS, opts.KubernetesOptions.Burst + + utils.SetQPSBurst(config, opts.KubernetesOptions) mgr, err := controllerruntime.NewManager(config, controllerruntime.Options{ Logger: klog.Background(), diff --git a/cmd/clusterlink/network-manager/app/options/options.go b/cmd/clusterlink/network-manager/app/options/options.go index 4c398cb1d..b80c3c8d0 100644 --- a/cmd/clusterlink/network-manager/app/options/options.go +++ b/cmd/clusterlink/network-manager/app/options/options.go @@ -9,15 +9,8 @@ import ( ) type Options struct { - LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubernetesOptions KubernetesOptions -} - -type KubernetesOptions struct { - KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"` - Master string `json:"master,omitempty" yaml:"master,omitempty"` - QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"` - Burst int `json:"burst,omitempty" yaml:"burst,omitempty"` + LeaderElection componentbaseconfig.LeaderElectionConfiguration + utils.KubernetesOptions } func NewOptions() *Options { diff --git a/cmd/clustertree/cluster-manager/app/extersion_apps.go b/cmd/clustertree/cluster-manager/app/extersion_apps.go index cab4ebfbd..2db886a24 100644 --- a/cmd/clustertree/cluster-manager/app/extersion_apps.go +++ b/cmd/clustertree/cluster-manager/app/extersion_apps.go @@ -15,6 +15,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" + "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) @@ -24,6 +25,9 @@ func StartHostDaemonSetsController(ctx context.Context, opts *options.Options, w if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) @@ -61,6 +65,9 @@ func StartDistributeController(ctx context.Context, opts *options.Options, workN //klog.Errorf("Unable to build kubeconfig: %v", err) klog.Errorf("Unable to build kubeconfig: %v", err) } + + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kosmosClient, err := versioned.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kosmosClient: %v", err) @@ -86,6 +93,9 @@ func StartDaemonSetsController(ctx context.Context, opts *options.Options, workN if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) @@ -119,6 +129,9 @@ func StartDaemonSetsMirrorController(ctx context.Context, opts *options.Options, if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) @@ -150,6 +163,9 @@ func StartPodReflectController(ctx context.Context, opts *options.Options, workN if err != nil { klog.Errorf("Unable to build kubeconfig: %v", err) } + + utils.SetQPSBurst(kubeconfig, opts.KubernetesOptions) + kubeClient, err := clientset.NewForConfig(kubeconfig) if err != nil { klog.Errorf("Unable to create kubeClient: %v", err) diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index 653463789..f2c4a965e 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -9,6 +9,7 @@ import ( "k8s.io/component-base/config/options" componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) @@ -17,16 +18,13 @@ const ( LeaderElectionNamespace = "kosmos-system" LeaderElectionResourceName = "cluster-manager" - DefaultKubeQPS = 40.0 - DefaultKubeBurst = 60 - CoreDNSServiceNamespace = "kube-system" CoreDNSServiceName = "kube-dns" ) type Options struct { - LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubernetesOptions KubernetesOptions + LeaderElection componentbaseconfig.LeaderElectionConfiguration + utils.KubernetesOptions ListenPort int32 DaemonSetController bool MultiClusterService bool @@ -54,13 +52,6 @@ type Options struct { SyncPeriod time.Duration } -type KubernetesOptions struct { - KubeConfig string - Master string - QPS float32 - Burst int -} - func NewOptions() (*Options, error) { var leaderElection componentbaseconfigv1alpha1.LeaderElectionConfiguration componentbaseconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&leaderElection) @@ -82,8 +73,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { return } - flags.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") - flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") + flags.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultClusterTreeKubeQPS, "QPS to use while talking with kube-apiserver.") + flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultClusterTreeKubeBurst, "Burst to use while talking with kube-apiserver.") flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.") flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") flags.Int32Var(&o.ListenPort, "listen-port", 10250, "Listen port for requests from the kube-apiserver.") diff --git a/pkg/utils/kube_client.go b/pkg/utils/kube_client.go index 65ca71e68..b98d20283 100644 --- a/pkg/utils/kube_client.go +++ b/pkg/utils/kube_client.go @@ -18,6 +18,21 @@ var ( defaultKubeConfig = filepath.Join(homedir.HomeDir(), ".kube", "config") ) +const ( + DefaultKubeQPS = 5.0 + DefaultKubeBurst = 10 + + DefaultClusterTreeKubeQPS = 40.0 + DefaultClusterTreeKubeBurst = 60 +) + +type KubernetesOptions struct { + KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"` + Master string `json:"master,omitempty" yaml:"master,omitempty"` + QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"` + Burst int `json:"burst,omitempty" yaml:"burst,omitempty"` +} + func loadKubeconfig(kubeconfigPath, context string) (*clientcmdapi.Config, error) { if kubeconfigPath == "" { kubeconfigPath = GetEnvString("KUBECONFIG", defaultKubeConfig) @@ -128,6 +143,11 @@ func NewClientFromConfigPath(configPath string, opts ...Opts) (kubernetes.Interf return client, nil } +func SetQPSBurst(config *rest.Config, options KubernetesOptions) { + config.QPS = options.QPS + config.Burst = options.Burst +} + func NewKosmosClientFromConfigPath(configPath string, opts ...Opts) (kosmosversioned.Interface, error) { var ( config *rest.Config diff --git a/pkg/utils/kube_client_test.go b/pkg/utils/kube_client_test.go new file mode 100644 index 000000000..f5b789d28 --- /dev/null +++ b/pkg/utils/kube_client_test.go @@ -0,0 +1,145 @@ +package utils + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +func TestSetQPSBurst(t *testing.T) { + testCases := []struct { + name string + KubernetesOptions + expectedQPS float32 + expectedBurst int + numRequests int + expectedMinTime float64 + expectedMaxTime float64 + }{ + { + name: "numRequests is 60", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + Master: "", + QPS: 5.0, + Burst: 10, + }, + numRequests: 60, + expectedQPS: 5.0, + expectedBurst: 10, + expectedMinTime: 6, + expectedMaxTime: 12, + }, + { + name: "numRequests is 600", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + Master: "", + QPS: 40.0, + Burst: 60, + }, + numRequests: 600, + expectedQPS: 40.0, + expectedBurst: 60, + expectedMinTime: 10, + expectedMaxTime: 15, + }, + { + name: "QPS is 1, Burst is 1", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + Master: "", + QPS: 1.0, + Burst: 1, + }, + numRequests: 30, + expectedQPS: 1.0, + expectedBurst: 1, + expectedMinTime: 29, + expectedMaxTime: 31, + }, + { + name: "QPS is 0, Burst is 0", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + Master: "", + QPS: 0.0, + Burst: 0, + }, + numRequests: 60, + expectedQPS: 5.0, + expectedBurst: 10, + expectedMinTime: 6, + expectedMaxTime: 12, + }, + { + name: "not set", + KubernetesOptions: KubernetesOptions{ + KubeConfig: "", + Master: "", + }, + numRequests: 60, + expectedQPS: 5.0, + expectedBurst: 10, + expectedMinTime: 6, + expectedMaxTime: 12, + }, + } + + restConfig, err := clientcmd.BuildConfigFromFlags("", "/Users/george/.kube/config") + if err != nil { + panic(err) + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + options := KubernetesOptions{ + QPS: tc.expectedQPS, + Burst: tc.expectedBurst, + } + SetQPSBurst(restConfig, options) + + // create client + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + fmt.Printf("error creating clientset: %s\n", err) + return + } + + ctx := context.Background() + var wg sync.WaitGroup + // simulate concurrent requests + numRequests := tc.numRequests + + start := time.Now() + for i := 0; i < numRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := clientset.CoreV1().Pods("").List(ctx, v1.ListOptions{}) + if err != nil { + fmt.Printf("request failed: %s\n", err) + } + }() + } + + // Wait for all requests to complete + wg.Wait() + elapsed := time.Since(start) + fmt.Printf("All requests completed, Execution time: %s\n", elapsed) + + assert.Equal(t, tc.expectedQPS, restConfig.QPS) + assert.Equal(t, tc.expectedBurst, restConfig.Burst) + seconds := elapsed.Seconds() + assert.Condition(t, func() bool { + return seconds > tc.expectedMinTime && seconds < tc.expectedMaxTime + }, "seconds %f should be greater than %f and less than %f", seconds, tc.expectedMinTime, tc.expectedMaxTime) + }) + } +}