diff --git a/pkg/controller/node_watcher.go b/pkg/controller/node_watcher.go index 5cd99286..9b3219dd 100644 --- a/pkg/controller/node_watcher.go +++ b/pkg/controller/node_watcher.go @@ -71,9 +71,17 @@ type NodeWatcher struct { } // NewNodeWatcher creates a node watcher object that will watch the nodes -func NewNodeWatcher(driverName string, client kubernetes.Interface, volumeLister corelisters.PersistentVolumeLister, - pvcLister corelisters.PersistentVolumeClaimLister, nodeInformer coreinformers.NodeInformer, - recorder record.EventRecorder, pvcToPodsCache *util.PVCToPodsCache, nodeWorkerExecuteInterval time.Duration, nodeListAndAddInterval time.Duration) *NodeWatcher { +func NewNodeWatcher(driverName string, + client kubernetes.Interface, + volumeLister corelisters.PersistentVolumeLister, + pvcLister corelisters.PersistentVolumeClaimLister, + nodeInformer coreinformers.NodeInformer, + recorder record.EventRecorder, + pvcToPodsCache *util.PVCToPodsCache, + nodeWorkerExecuteInterval time.Duration, + nodeListAndAddInterval time.Duration, + contentRateLimiter workqueue.RateLimiter, +) *NodeWatcher { watcher := &NodeWatcher{ driverName: driverName, @@ -83,7 +91,7 @@ func NewNodeWatcher(driverName string, client kubernetes.Interface, volumeLister recorder: recorder, volumeLister: volumeLister, pvcLister: pvcLister, - nodeQueue: workqueue.NewNamed("nodes"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "nodes"), nodeFirstBrokenMap: make(map[string]time.Time), nodeEverMarkedDown: make(map[string]bool), pvcToPodsCache: pvcToPodsCache, diff --git a/pkg/controller/pv_monitor_controller.go b/pkg/controller/pv_monitor_controller.go index 63c73317..840b4d3e 100644 --- a/pkg/controller/pv_monitor_controller.go +++ b/pkg/controller/pv_monitor_controller.go @@ -161,7 +161,18 @@ func NewPVMonitorController( }) if ctrl.enableNodeWatcher { - ctrl.nodeWatcher = NewNodeWatcher(ctrl.driverName, ctrl.client, ctrl.pvLister, ctrl.pvcLister, nodeInformer, ctrl.eventRecorder, ctrl.pvcToPodsCache, option.NodeWorkerExecuteInterval, option.NodeListAndAddInterval) + ctrl.nodeWatcher = NewNodeWatcher( + ctrl.driverName, + ctrl.client, + ctrl.pvLister, + ctrl.pvcLister, + nodeInformer, + ctrl.eventRecorder, + ctrl.pvcToPodsCache, + option.NodeWorkerExecuteInterval, + option.NodeListAndAddInterval, + contentRateLimiter, + ) } ctrl.pvChecker = handler.NewPVHealthConditionChecker(option.DriverName, conn, client, option.ContextTimeout, ctrl.pvcLister, ctrl.pvLister, eventInformer, ctrl.eventRecorder)