Skip to content

Commit

Permalink
Merge pull request #455 from huww98/type-queue
Browse files Browse the repository at this point in the history
replace deprecated Queue and RateLimiter
  • Loading branch information
k8s-ci-robot authored Dec 2, 2024
2 parents 3ea4ee4 + 371d6e5 commit 73401c9
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 23 deletions.
4 changes: 2 additions & 2 deletions cmd/csi-resizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ func main() {

resizerName := csiResizer.Name()
rc := controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
*handleVolumeInUseError, *retryIntervalMax)

modifierName := csiModifier.Name()
var mc modifycontroller.ModifyController
// Add modify controller only if the feature gate is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) {
mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *extraModifyMetadata, informerFactory,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax))
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax))
}

run := func(ctx context.Context) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type resizeController struct {
name string
resizer resizer.Resizer
kubeClient kubernetes.Interface
claimQueue workqueue.RateLimitingInterface
claimQueue workqueue.TypedRateLimitingInterface[string]
eventRecorder record.EventRecorder
pvSynced cache.InformerSynced
pvcSynced cache.InformerSynced
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewResizeController(
kubeClient kubernetes.Interface,
resyncPeriod time.Duration,
informerFactory informers.SharedInformerFactory,
pvcRateLimiter workqueue.RateLimiter,
pvcRateLimiter workqueue.TypedRateLimiter[string],
handleVolumeInUseError bool,
maxRetryInterval time.Duration) ResizeController {
pvInformer := informerFactory.Core().V1().PersistentVolumes()
Expand All @@ -98,8 +98,10 @@ func NewResizeController(
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme,
v1.EventSource{Component: fmt.Sprintf("external-resizer %s", name)})

claimQueue := workqueue.NewNamedRateLimitingQueue(
pvcRateLimiter, fmt.Sprintf("%s-pvc", name))
claimQueue := workqueue.NewTypedRateLimitingQueueWithConfig(
pvcRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
Name: fmt.Sprintf("%s-pvc", name),
})

ctrl := &resizeController{
name: name,
Expand Down Expand Up @@ -302,7 +304,7 @@ func (ctrl *resizeController) syncPVCs() {
}
defer ctrl.claimQueue.Done(key)

err := ctrl.syncPVC(key.(string))
err := ctrl.syncPVC(key)

if err != nil {
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) && util.IsDelayRetryError(err) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestController(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AnnotateFsResize, true)
controller := NewResizeController(driverName, csiResizer,
kubeClient, time.Second,
informerFactory, workqueue.DefaultControllerRateLimiter(),
informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](),
!test.disableVolumeInUseErrorHandler,
2*time.Minute /* maxRetryInterval */)

Expand Down Expand Up @@ -410,7 +410,7 @@ func TestResizePVC(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AnnotateFsResize, true)
controller := NewResizeController(driverName, csiResizer,
kubeClient, time.Second,
informerFactory, workqueue.DefaultControllerRateLimiter(),
informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](),
true, /* disableVolumeInUseErrorHandler*/
2*time.Minute /* maxRetryInterval */)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/expand_and_recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestExpandAndRecover(t *testing.T) {
controller := NewResizeController(driverName,
csiResizer, kubeClient,
time.Second, informerFactory,
workqueue.DefaultControllerRateLimiter(), true /*handleVolumeInUseError*/, 2*time.Minute /*maxRetryInterval*/)
workqueue.DefaultTypedControllerRateLimiter[string](), true /*handleVolumeInUseError*/, 2*time.Minute /*maxRetryInterval*/)

ctrlInstance, _ := controller.(*resizeController)
recorder := record.NewFakeRecorder(10)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/resize_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestResizeFunctions(t *testing.T) {
controller := NewResizeController(driverName,
csiResizer, kubeClient,
time.Second, informerFactory,
workqueue.DefaultControllerRateLimiter(),
workqueue.DefaultTypedControllerRateLimiter[string](),
true, /*handleVolumeInUseError*/
2*time.Minute /*maxRetryInterval*/)

Expand Down
12 changes: 7 additions & 5 deletions pkg/modifycontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type modifyController struct {
name string
modifier modifier.Modifier
kubeClient kubernetes.Interface
claimQueue workqueue.RateLimitingInterface
claimQueue workqueue.TypedRateLimitingInterface[string]
eventRecorder record.EventRecorder
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
Expand All @@ -71,7 +71,7 @@ func NewModifyController(
resyncPeriod time.Duration,
extraModifyMetadata bool,
informerFactory informers.SharedInformerFactory,
pvcRateLimiter workqueue.RateLimiter) ModifyController {
pvcRateLimiter workqueue.TypedRateLimiter[string]) ModifyController {
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()
Expand All @@ -81,8 +81,10 @@ func NewModifyController(
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme,
v1.EventSource{Component: fmt.Sprintf("external-resizer %s", name)})

claimQueue := workqueue.NewNamedRateLimitingQueue(
pvcRateLimiter, fmt.Sprintf("%s-pvc", name))
claimQueue := workqueue.NewTypedRateLimitingQueueWithConfig(
pvcRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
Name: fmt.Sprintf("%s-pvc", name),
})

ctrl := &modifyController{
name: name,
Expand Down Expand Up @@ -243,7 +245,7 @@ func (ctrl *modifyController) sync() {
}
defer ctrl.claimQueue.Done(key)

if err := ctrl.syncPVC(key.(string)); err != nil {
if err := ctrl.syncPVC(key); err != nil {
// Put PVC back to the queue so that we can retry later.
klog.ErrorS(err, "Error syncing PVC")
ctrl.claimQueue.AddRateLimited(key)
Expand Down
4 changes: 2 additions & 2 deletions pkg/modifycontroller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestController(t *testing.T) {
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

Expand Down Expand Up @@ -188,7 +188,7 @@ func TestModifyPVC(t *testing.T) {
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

Expand Down
8 changes: 4 additions & 4 deletions pkg/modifycontroller/modify_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) {
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

Expand Down Expand Up @@ -179,7 +179,7 @@ func TestUpdateConditionBasedOnError(t *testing.T) {
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

Expand Down Expand Up @@ -247,7 +247,7 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) {
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

Expand Down Expand Up @@ -313,7 +313,7 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

Expand Down
2 changes: 1 addition & 1 deletion pkg/modifycontroller/modify_volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestModify(t *testing.T) {
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, test.withExtraMetadata, informerFactory,
workqueue.DefaultControllerRateLimiter())
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

Expand Down

0 comments on commit 73401c9

Please sign in to comment.