diff --git a/cmd/csi-external-health-monitor-controller/main.go b/cmd/csi-external-health-monitor-controller/main.go index 77af21d2..24b55f0b 100644 --- a/cmd/csi-external-health-monitor-controller/main.go +++ b/cmd/csi-external-health-monitor-controller/main.go @@ -18,12 +18,12 @@ package main import ( "context" - "flag" "fmt" "net/http" "os" "time" + flag "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "github.com/container-storage-interface/spec/lib/go/csi" @@ -74,6 +75,11 @@ var ( metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") + + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed pv monitoring. It doubles with each failure, up to retry-interval-max. Default is 1 second.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed pv monitoring. Default is 5 minutes.") + kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") + kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") ) var ( @@ -112,6 +118,9 @@ func main() { os.Exit(1) } + config.QPS = *kubeAPIQPS + config.Burst = *kubeAPIBurst + clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Error(err.Error()) @@ -210,8 +219,18 @@ func main() { broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(v1.NamespaceAll)}) eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-pv-monitor-controller-%s", option.DriverName)}) - monitorController := monitorcontroller.NewPVMonitorController(clientset, csiConn, factory.Core().V1().PersistentVolumes(), - factory.Core().V1().PersistentVolumeClaims(), factory.Core().V1().Pods(), factory.Core().V1().Nodes(), factory.Core().V1().Events(), eventRecorder, &option) + monitorController := monitorcontroller.NewPVMonitorController( + clientset, + csiConn, + factory.Core().V1().PersistentVolumes(), + factory.Core().V1().PersistentVolumeClaims(), + factory.Core().V1().Pods(), + factory.Core().V1().Nodes(), + factory.Core().V1().Events(), + eventRecorder, + &option, + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + ) run := func(ctx context.Context) { stopCh := ctx.Done() @@ -224,7 +243,13 @@ func main() { } else { // Name of config map with leader election lock lockName := "external-health-monitor-leader-" + storageDriver - le := leaderelection.NewLeaderElection(clientset, lockName, run) + // create a new clientset for leader election + leClientset, err := kubernetes.NewForConfig(config) + if err != nil { + klog.Fatalf("Failed to create leaderelection client: %v", err) + } + le := leaderelection.NewLeaderElection(leClientset, lockName, run) + if *httpEndpoint != "" { le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) } diff --git a/go.mod b/go.mod index e1b49c94..f7c396f5 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/imdario/mergo v0.3.11 // indirect github.com/kubernetes-csi/csi-lib-utils v0.10.0 github.com/kubernetes-csi/csi-test/v3 v3.1.2-0.20200722022205-189919973123 + github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index f758a194..3531fee9 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "k8s.io/client-go/util/workqueue" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -92,7 +94,17 @@ func runTest(t *testing.T, tc *testCase) { } mockCSIcontrollerServer(controllerServer, tc.supportListVolumes, volumes) - pvMonitorController := NewPVMonitorController(client, csiConn, pvInformer, pvcInformer, podInformer, nodeInformer, eventInformer, &eventRecorder, option) + pvMonitorController := NewPVMonitorController(client, + csiConn, + pvInformer, + pvcInformer, + podInformer, + nodeInformer, + eventInformer, + &eventRecorder, + option, + workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + ) assert.NotNil(pvMonitorController) if tc.hasRecoveryEvent { diff --git a/pkg/controller/node_watcher.go b/pkg/controller/node_watcher.go index 5cd99286..9b3219dd 100644 --- a/pkg/controller/node_watcher.go +++ b/pkg/controller/node_watcher.go @@ -71,9 +71,17 @@ type NodeWatcher struct { } // NewNodeWatcher creates a node watcher object that will watch the nodes -func NewNodeWatcher(driverName string, client kubernetes.Interface, volumeLister corelisters.PersistentVolumeLister, - pvcLister corelisters.PersistentVolumeClaimLister, nodeInformer coreinformers.NodeInformer, - recorder record.EventRecorder, pvcToPodsCache *util.PVCToPodsCache, nodeWorkerExecuteInterval time.Duration, nodeListAndAddInterval time.Duration) *NodeWatcher { +func NewNodeWatcher(driverName string, + client kubernetes.Interface, + volumeLister corelisters.PersistentVolumeLister, + pvcLister corelisters.PersistentVolumeClaimLister, + nodeInformer coreinformers.NodeInformer, + recorder record.EventRecorder, + pvcToPodsCache *util.PVCToPodsCache, + nodeWorkerExecuteInterval time.Duration, + nodeListAndAddInterval time.Duration, + contentRateLimiter workqueue.RateLimiter, +) *NodeWatcher { watcher := &NodeWatcher{ driverName: driverName, @@ -83,7 +91,7 @@ func NewNodeWatcher(driverName string, client kubernetes.Interface, volumeLister recorder: recorder, volumeLister: volumeLister, pvcLister: pvcLister, - nodeQueue: workqueue.NewNamed("nodes"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "nodes"), nodeFirstBrokenMap: make(map[string]time.Time), nodeEverMarkedDown: make(map[string]bool), pvcToPodsCache: pvcToPodsCache, diff --git a/pkg/controller/pv_monitor_controller.go b/pkg/controller/pv_monitor_controller.go index 4e945953..bb41a2da 100644 --- a/pkg/controller/pv_monitor_controller.go +++ b/pkg/controller/pv_monitor_controller.go @@ -96,8 +96,18 @@ type PVMonitorOptions struct { } // NewPVMonitorController creates PV monitor controller -func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn, pvInformer coreinformers.PersistentVolumeInformer, - pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, eventInformer coreinformers.EventInformer, eventRecorder record.EventRecorder, option *PVMonitorOptions) *PVMonitorController { +func NewPVMonitorController( + client kubernetes.Interface, + conn *grpc.ClientConn, + pvInformer coreinformers.PersistentVolumeInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + podInformer coreinformers.PodInformer, + nodeInformer coreinformers.NodeInformer, + eventInformer coreinformers.EventInformer, + eventRecorder record.EventRecorder, + option *PVMonitorOptions, + contentRateLimiter workqueue.RateLimiter, +) *PVMonitorController { ctrl := &PVMonitorController{ csiConn: conn, @@ -106,7 +116,7 @@ func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn, enableNodeWatcher: option.EnableNodeWatcher, client: client, driverName: option.DriverName, - pvQueue: workqueue.NewNamed("csi-monitor-pv-queue"), + pvQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-monitor-pv-queue"), pvcToPodsCache: util.NewPVCToPodsCache(), pvEnqueued: make(map[string]bool), @@ -151,7 +161,18 @@ func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn, }) if ctrl.enableNodeWatcher { - ctrl.nodeWatcher = NewNodeWatcher(ctrl.driverName, ctrl.client, ctrl.pvLister, ctrl.pvcLister, nodeInformer, ctrl.eventRecorder, ctrl.pvcToPodsCache, option.NodeWorkerExecuteInterval, option.NodeListAndAddInterval) + ctrl.nodeWatcher = NewNodeWatcher( + ctrl.driverName, + ctrl.client, + ctrl.pvLister, + ctrl.pvcLister, + nodeInformer, + ctrl.eventRecorder, + ctrl.pvcToPodsCache, + option.NodeWorkerExecuteInterval, + option.NodeListAndAddInterval, + contentRateLimiter, + ) } ctrl.pvChecker = handler.NewPVHealthConditionChecker(option.DriverName, conn, client, option.ContextTimeout, ctrl.pvcLister, ctrl.pvLister, eventInformer, ctrl.eventRecorder) diff --git a/vendor/modules.txt b/vendor/modules.txt index 42678b73..fcb31436 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -86,6 +86,7 @@ github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util # github.com/spf13/pflag v1.0.5 +## explicit github.com/spf13/pflag # github.com/stretchr/testify v1.7.0 ## explicit