Skip to content

Commit

Permalink
add configurable throughput for clients
Browse files Browse the repository at this point in the history
Signed-off-by: Ashutosh Kumar <[email protected]>
  • Loading branch information
sonasingh46 committed Feb 6, 2022
1 parent 88b3d0c commit fba209e
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 8 deletions.
33 changes: 29 additions & 4 deletions cmd/csi-external-health-monitor-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package main

import (
"context"
"flag"
"fmt"
"net/http"
"os"
"time"

flag "github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -32,6 +32,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -74,6 +75,11 @@ var (
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")

retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed pv monitoring. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed pv monitoring. Default is 5 minutes.")
kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
)

var (
Expand Down Expand Up @@ -112,6 +118,9 @@ func main() {
os.Exit(1)
}

config.QPS = *kubeAPIQPS
config.Burst = *kubeAPIBurst

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Error(err.Error())
Expand Down Expand Up @@ -210,8 +219,18 @@ func main() {
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(v1.NamespaceAll)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-pv-monitor-controller-%s", option.DriverName)})

monitorController := monitorcontroller.NewPVMonitorController(clientset, csiConn, factory.Core().V1().PersistentVolumes(),
factory.Core().V1().PersistentVolumeClaims(), factory.Core().V1().Pods(), factory.Core().V1().Nodes(), factory.Core().V1().Events(), eventRecorder, &option)
monitorController := monitorcontroller.NewPVMonitorController(
clientset,
csiConn,
factory.Core().V1().PersistentVolumes(),
factory.Core().V1().PersistentVolumeClaims(),
factory.Core().V1().Pods(),
factory.Core().V1().Nodes(),
factory.Core().V1().Events(),
eventRecorder,
&option,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
)

run := func(ctx context.Context) {
stopCh := ctx.Done()
Expand All @@ -224,7 +243,13 @@ func main() {
} else {
// Name of config map with leader election lock
lockName := "external-health-monitor-leader-" + storageDriver
le := leaderelection.NewLeaderElection(clientset, lockName, run)
// create a new clientset for leader election
leClientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create leaderelection client: %v", err)
}
le := leaderelection.NewLeaderElection(leClientset, lockName, run)

if *httpEndpoint != "" {
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/imdario/mergo v0.3.11 // indirect
github.com/kubernetes-csi/csi-lib-utils v0.10.0
github.com/kubernetes-csi/csi-test/v3 v3.1.2-0.20200722022205-189919973123
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
14 changes: 13 additions & 1 deletion pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"k8s.io/client-go/util/workqueue"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -92,7 +94,17 @@ func runTest(t *testing.T, tc *testCase) {
}

mockCSIcontrollerServer(controllerServer, tc.supportListVolumes, volumes)
pvMonitorController := NewPVMonitorController(client, csiConn, pvInformer, pvcInformer, podInformer, nodeInformer, eventInformer, &eventRecorder, option)
pvMonitorController := NewPVMonitorController(client,
csiConn,
pvInformer,
pvcInformer,
podInformer,
nodeInformer,
eventInformer,
&eventRecorder,
option,
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
)
assert.NotNil(pvMonitorController)

if tc.hasRecoveryEvent {
Expand Down
16 changes: 13 additions & 3 deletions pkg/controller/pv_monitor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,18 @@ type PVMonitorOptions struct {
}

// NewPVMonitorController creates PV monitor controller
func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn, pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, eventInformer coreinformers.EventInformer, eventRecorder record.EventRecorder, option *PVMonitorOptions) *PVMonitorController {
func NewPVMonitorController(
client kubernetes.Interface,
conn *grpc.ClientConn,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
eventInformer coreinformers.EventInformer,
eventRecorder record.EventRecorder,
option *PVMonitorOptions,
contentRateLimiter workqueue.RateLimiter,
) *PVMonitorController {

ctrl := &PVMonitorController{
csiConn: conn,
Expand All @@ -106,7 +116,7 @@ func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn,
enableNodeWatcher: option.EnableNodeWatcher,
client: client,
driverName: option.DriverName,
pvQueue: workqueue.NewNamed("csi-monitor-pv-queue"),
pvQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-monitor-pv-queue"),

pvcToPodsCache: util.NewPVCToPodsCache(),
pvEnqueued: make(map[string]bool),
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs
github.com/prometheus/procfs/internal/util
# github.com/spf13/pflag v1.0.5
## explicit
github.com/spf13/pflag
# github.com/stretchr/testify v1.7.0
## explicit
Expand Down

0 comments on commit fba209e

Please sign in to comment.