From 8b46c6f9b36e1982eddad96d3739d39633f9970e Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sat, 22 Oct 2022 17:43:35 +0530 Subject: [PATCH 1/5] Parameterize concurrency (number of workers). Users should be able to set number of workers --- cmd/utils/flags.go | 4 +++- operator/redisfailover/config.go | 1 + operator/redisfailover/factory.go | 15 ++++++++------- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2dabd1fda..77fbabcb4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -16,6 +16,7 @@ type CMDFlags struct { Debug bool ListenAddr string MetricsPath string + Concurrency int } // Init initializes and parse the flags @@ -27,7 +28,7 @@ func (c *CMDFlags) Init() { flag.BoolVar(&c.Debug, "debug", false, "enable debug mode") flag.StringVar(&c.ListenAddr, "listen-address", ":9710", "Address to listen on for metrics.") flag.StringVar(&c.MetricsPath, "metrics-path", "/metrics", "Path to serve the metrics.") - + flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events") // Parse flags flag.Parse() } @@ -37,5 +38,6 @@ func (c *CMDFlags) ToRedisOperatorConfig() redisfailover.Config { return redisfailover.Config{ ListenAddress: c.ListenAddr, MetricsPath: c.MetricsPath, + Concurrency: c.Concurrency, } } diff --git a/operator/redisfailover/config.go b/operator/redisfailover/config.go index 4fa2f56b5..97e316779 100644 --- a/operator/redisfailover/config.go +++ b/operator/redisfailover/config.go @@ -4,4 +4,5 @@ package redisfailover type Config struct { ListenAddress string MetricsPath string + Concurrency int } diff --git a/operator/redisfailover/factory.go b/operator/redisfailover/factory.go index 2b4c82c8b..2ebc67eb7 100644 --- a/operator/redisfailover/factory.go +++ b/operator/redisfailover/factory.go @@ -47,13 +47,14 @@ func New(cfg Config, k8sService k8s.Services, k8sClient kubernetes.Interface, lo // Create our controller. return controller.New(&controller.Config{ - Handler: rfHandler, - Retriever: rfRetriever, - LeaderElector: leSVC, - MetricsRecorder: kooperMetricsRecorder, - Logger: kooperLogger, - Name: "redisfailover", - ResyncInterval: resync, + Handler: rfHandler, + Retriever: rfRetriever, + LeaderElector: leSVC, + MetricsRecorder: kooperMetricsRecorder, + Logger: kooperLogger, + Name: "redisfailover", + ResyncInterval: resync, + ConcurrentWorkers: cfg.Concurrency, }) } From ba28ba9f36b51acf747603033922f9da748bbe27 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sat, 22 Oct 2022 17:51:55 +0530 Subject: [PATCH 2/5] Add comment --- cmd/utils/flags.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 77fbabcb4..7cf6e3965 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -28,6 +28,8 @@ func (c *CMDFlags) Init() { flag.BoolVar(&c.Debug, "debug", false, "enable debug mode") flag.StringVar(&c.ListenAddr, "listen-address", ":9710", "Address to listen on for metrics.") flag.StringVar(&c.MetricsPath, "metrics-path", "/metrics", "Path to serve the metrics.") + // default is 3 for conccurency because kooper also defines 3 as default + // reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89 flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events") // Parse flags flag.Parse() From d5fb36f94df3b3e196b5b294635770bd77791833 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sat, 22 Oct 2022 18:04:50 +0530 Subject: [PATCH 3/5] Parameterize queries per second and queries burstable limit --- cmd/utils/flags.go | 15 +++++++++------ cmd/utils/k8s.go | 9 ++------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2dabd1fda..4acff0c20 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -11,11 +11,13 @@ import ( // CMDFlags are the flags used by the cmd // TODO: improve flags. type CMDFlags struct { - KubeConfig string - Development bool - Debug bool - ListenAddr string - MetricsPath string + KubeConfig string + Development bool + Debug bool + ListenAddr string + MetricsPath string + K8sQueriesPerSecond int + K8sQueriesBurstable int } // Init initializes and parse the flags @@ -27,7 +29,8 @@ func (c *CMDFlags) Init() { flag.BoolVar(&c.Debug, "debug", false, "enable debug mode") flag.StringVar(&c.ListenAddr, "listen-address", ":9710", "Address to listen on for metrics.") flag.StringVar(&c.MetricsPath, "metrics-path", "/metrics", "Path to serve the metrics.") - + flag.IntVar(&c.K8sQueriesPerSecond, "k8s-cli-qps-limit", 100, "Number of allowed queries per second by kubernetes client without client side throttling") + flag.IntVar(&c.K8sQueriesPerSecond, "k8s-cli-burstable-limit", 100, "Number of allowed burst requests by kubernetes client without client side throttling") // Parse flags flag.Parse() } diff --git a/cmd/utils/k8s.go b/cmd/utils/k8s.go index 7c3d79c6a..a470931f8 100644 --- a/cmd/utils/k8s.go +++ b/cmd/utils/k8s.go @@ -11,11 +11,6 @@ import ( redisfailoverclientset "github.com/spotahome/redis-operator/client/k8s/clientset/versioned" ) -const ( - defCliQPS = 100 - defCliBurst = 100 -) - // LoadKubernetesConfig loads kubernetes configuration based on flags. func LoadKubernetesConfig(flags *CMDFlags) (*rest.Config, error) { var cfg *rest.Config @@ -34,8 +29,8 @@ func LoadKubernetesConfig(flags *CMDFlags) (*rest.Config, error) { cfg = config } - cfg.QPS = defCliQPS - cfg.Burst = defCliBurst + cfg.QPS = float32(flags.K8sQueriesPerSecond) + cfg.Burst = flags.K8sQueriesBurstable return cfg, nil } From 1c8ec37a2616a010f3a71575b4ca8d398357dbe7 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sat, 22 Oct 2022 18:54:25 +0530 Subject: [PATCH 4/5] Fix typo --- cmd/utils/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 4acff0c20..bf325df68 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -30,7 +30,7 @@ func (c *CMDFlags) Init() { flag.StringVar(&c.ListenAddr, "listen-address", ":9710", "Address to listen on for metrics.") flag.StringVar(&c.MetricsPath, "metrics-path", "/metrics", "Path to serve the metrics.") flag.IntVar(&c.K8sQueriesPerSecond, "k8s-cli-qps-limit", 100, "Number of allowed queries per second by kubernetes client without client side throttling") - flag.IntVar(&c.K8sQueriesPerSecond, "k8s-cli-burstable-limit", 100, "Number of allowed burst requests by kubernetes client without client side throttling") + flag.IntVar(&c.K8sQueriesBurstable, "k8s-cli-burstable-limit", 100, "Number of allowed burst requests by kubernetes client without client side throttling") // Parse flags flag.Parse() } From 88ae4f5909a9ad39b4d11316e8eeafcb0d61d6c5 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Fri, 11 Nov 2022 16:36:14 +0530 Subject: [PATCH 5/5] format changes --- cmd/utils/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8e5ea58e1..578fe0e62 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -18,7 +18,7 @@ type CMDFlags struct { K8sQueriesPerSecond int K8sQueriesBurstable int Concurrency int - LogLevel string + LogLevel string } // Init initializes and parse the flags