From 371d6e53a4730da8f3fd073c00c3e1710c2c2a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Mon, 25 Nov 2024 20:13:08 +0800 Subject: [PATCH] replace deprecated Queue and RateLimiter --- cmd/csi-resizer/main.go | 4 ++-- pkg/controller/controller.go | 12 +++++++----- pkg/controller/controller_test.go | 4 ++-- pkg/controller/expand_and_recover_test.go | 2 +- pkg/controller/resize_status_test.go | 2 +- pkg/modifycontroller/controller.go | 12 +++++++----- pkg/modifycontroller/controller_test.go | 4 ++-- pkg/modifycontroller/modify_status_test.go | 8 ++++---- pkg/modifycontroller/modify_volume_test.go | 2 +- 9 files changed, 27 insertions(+), 23 deletions(-) diff --git a/cmd/csi-resizer/main.go b/cmd/csi-resizer/main.go index 7395ffb13..6e0b7bc57 100644 --- a/cmd/csi-resizer/main.go +++ b/cmd/csi-resizer/main.go @@ -215,7 +215,7 @@ func main() { resizerName := csiResizer.Name() rc := controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory, - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), *handleVolumeInUseError, *retryIntervalMax) modifierName := csiModifier.Name() @@ -223,7 +223,7 @@ func main() { // Add modify controller only if the feature gate is enabled if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) { mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *extraModifyMetadata, informerFactory, - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)) + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)) } run := func(ctx context.Context) { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3533f010f..952d0973e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -58,7 +58,7 @@ type resizeController struct { name string resizer resizer.Resizer kubeClient kubernetes.Interface - claimQueue workqueue.RateLimitingInterface + claimQueue workqueue.TypedRateLimitingInterface[string] eventRecorder record.EventRecorder pvSynced cache.InformerSynced pvcSynced cache.InformerSynced @@ -87,7 +87,7 @@ func NewResizeController( kubeClient kubernetes.Interface, resyncPeriod time.Duration, informerFactory informers.SharedInformerFactory, - pvcRateLimiter workqueue.RateLimiter, + pvcRateLimiter workqueue.TypedRateLimiter[string], handleVolumeInUseError bool, maxRetryInterval time.Duration) ResizeController { pvInformer := informerFactory.Core().V1().PersistentVolumes() @@ -98,8 +98,10 @@ func NewResizeController( eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("external-resizer %s", name)}) - claimQueue := workqueue.NewNamedRateLimitingQueue( - pvcRateLimiter, fmt.Sprintf("%s-pvc", name)) + claimQueue := workqueue.NewTypedRateLimitingQueueWithConfig( + pvcRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{ + Name: fmt.Sprintf("%s-pvc", name), + }) ctrl := &resizeController{ name: name, @@ -302,7 +304,7 @@ func (ctrl *resizeController) syncPVCs() { } defer ctrl.claimQueue.Done(key) - err := ctrl.syncPVC(key.(string)) + err := ctrl.syncPVC(key) if err != nil { if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) && util.IsDelayRetryError(err) { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index fb1e802b7..b54e3d222 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -245,7 +245,7 @@ func TestController(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AnnotateFsResize, true) controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, - informerFactory, workqueue.DefaultControllerRateLimiter(), + informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](), !test.disableVolumeInUseErrorHandler, 2*time.Minute /* maxRetryInterval */) @@ -410,7 +410,7 @@ func TestResizePVC(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AnnotateFsResize, true) controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, - informerFactory, workqueue.DefaultControllerRateLimiter(), + informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](), true, /* disableVolumeInUseErrorHandler*/ 2*time.Minute /* maxRetryInterval */) diff --git a/pkg/controller/expand_and_recover_test.go b/pkg/controller/expand_and_recover_test.go index 394148b2e..469db86c6 100644 --- a/pkg/controller/expand_and_recover_test.go +++ b/pkg/controller/expand_and_recover_test.go @@ -173,7 +173,7 @@ func TestExpandAndRecover(t *testing.T) { controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, - workqueue.DefaultControllerRateLimiter(), true /*handleVolumeInUseError*/, 2*time.Minute /*maxRetryInterval*/) + workqueue.DefaultTypedControllerRateLimiter[string](), true /*handleVolumeInUseError*/, 2*time.Minute /*maxRetryInterval*/) ctrlInstance, _ := controller.(*resizeController) recorder := record.NewFakeRecorder(10) diff --git a/pkg/controller/resize_status_test.go b/pkg/controller/resize_status_test.go index e3804471c..7e13bfea6 100644 --- a/pkg/controller/resize_status_test.go +++ b/pkg/controller/resize_status_test.go @@ -94,7 +94,7 @@ func TestResizeFunctions(t *testing.T) { controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, - workqueue.DefaultControllerRateLimiter(), + workqueue.DefaultTypedControllerRateLimiter[string](), true, /*handleVolumeInUseError*/ 2*time.Minute /*maxRetryInterval*/) diff --git a/pkg/modifycontroller/controller.go b/pkg/modifycontroller/controller.go index cae14c14e..26f17a221 100644 --- a/pkg/modifycontroller/controller.go +++ b/pkg/modifycontroller/controller.go @@ -50,7 +50,7 @@ type modifyController struct { name string modifier modifier.Modifier kubeClient kubernetes.Interface - claimQueue workqueue.RateLimitingInterface + claimQueue workqueue.TypedRateLimitingInterface[string] eventRecorder record.EventRecorder pvLister corelisters.PersistentVolumeLister pvListerSynced cache.InformerSynced @@ -71,7 +71,7 @@ func NewModifyController( resyncPeriod time.Duration, extraModifyMetadata bool, informerFactory informers.SharedInformerFactory, - pvcRateLimiter workqueue.RateLimiter) ModifyController { + pvcRateLimiter workqueue.TypedRateLimiter[string]) ModifyController { pvInformer := informerFactory.Core().V1().PersistentVolumes() pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() @@ -81,8 +81,10 @@ func NewModifyController( eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("external-resizer %s", name)}) - claimQueue := workqueue.NewNamedRateLimitingQueue( - pvcRateLimiter, fmt.Sprintf("%s-pvc", name)) + claimQueue := workqueue.NewTypedRateLimitingQueueWithConfig( + pvcRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{ + Name: fmt.Sprintf("%s-pvc", name), + }) ctrl := &modifyController{ name: name, @@ -243,7 +245,7 @@ func (ctrl *modifyController) sync() { } defer ctrl.claimQueue.Done(key) - if err := ctrl.syncPVC(key.(string)); err != nil { + if err := ctrl.syncPVC(key); err != nil { // Put PVC back to the queue so that we can retry later. klog.ErrorS(err, "Error syncing PVC") ctrl.claimQueue.AddRateLimited(key) diff --git a/pkg/modifycontroller/controller_test.go b/pkg/modifycontroller/controller_test.go index ce9e95188..91f47bef6 100644 --- a/pkg/modifycontroller/controller_test.go +++ b/pkg/modifycontroller/controller_test.go @@ -87,7 +87,7 @@ func TestController(t *testing.T) { controller := NewModifyController(driverName, csiModifier, kubeClient, time.Second, false, informerFactory, - workqueue.DefaultControllerRateLimiter()) + workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -188,7 +188,7 @@ func TestModifyPVC(t *testing.T) { controller := NewModifyController(driverName, csiModifier, kubeClient, time.Second, false, informerFactory, - workqueue.DefaultControllerRateLimiter()) + workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) diff --git a/pkg/modifycontroller/modify_status_test.go b/pkg/modifycontroller/modify_status_test.go index bc34d4d37..fb49778a6 100644 --- a/pkg/modifycontroller/modify_status_test.go +++ b/pkg/modifycontroller/modify_status_test.go @@ -119,7 +119,7 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) { controller := NewModifyController(driverName, csiModifier, kubeClient, time.Second, false, informerFactory, - workqueue.DefaultControllerRateLimiter()) + workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -179,7 +179,7 @@ func TestUpdateConditionBasedOnError(t *testing.T) { controller := NewModifyController(driverName, csiModifier, kubeClient, time.Second, false, informerFactory, - workqueue.DefaultControllerRateLimiter()) + workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -247,7 +247,7 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) { controller := NewModifyController(driverName, csiModifier, kubeClient, time.Second, false, informerFactory, - workqueue.DefaultControllerRateLimiter()) + workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) @@ -313,7 +313,7 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) { controller := NewModifyController(driverName, csiModifier, kubeClient, time.Second, false, informerFactory, - workqueue.DefaultControllerRateLimiter()) + workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController) diff --git a/pkg/modifycontroller/modify_volume_test.go b/pkg/modifycontroller/modify_volume_test.go index 7f09275f9..76770c6d1 100644 --- a/pkg/modifycontroller/modify_volume_test.go +++ b/pkg/modifycontroller/modify_volume_test.go @@ -136,7 +136,7 @@ func TestModify(t *testing.T) { controller := NewModifyController(driverName, csiModifier, kubeClient, time.Second, test.withExtraMetadata, informerFactory, - workqueue.DefaultControllerRateLimiter()) + workqueue.DefaultTypedControllerRateLimiter[string]()) ctrlInstance, _ := controller.(*modifyController)