Skip to content

Commit

Permalink
Merge pull request #420 from mdzraf/resizerExtraMetadataFeature
Browse files Browse the repository at this point in the history
Add extra-modify-metadata flag to external-resizer sidecar
  • Loading branch information
k8s-ci-robot authored Sep 5, 2024
2 parents 57aecf4 + 75c51b6 commit 7461a0d
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 63 deletions.
5 changes: 4 additions & 1 deletion cmd/csi-resizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
resyncPeriod = flag.Duration("resync-period", time.Minute*10, "Resync period for cache")
workers = flag.Int("workers", 10, "Concurrency to process multiple resize requests")

extraModifyMetadata = flag.Bool("extra-modify-metadata", false, "If set, add pv/pvc metadata to plugin modify requests as parameters.")

csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
timeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")

Expand Down Expand Up @@ -188,6 +190,7 @@ func main() {
*timeout,
kubeClient,
informerFactory,
*extraModifyMetadata,
driverName)
if err != nil {
klog.ErrorS(err, "Failed to create CSI modifier")
Expand Down Expand Up @@ -217,7 +220,7 @@ func main() {
var mc modifycontroller.ModifyController
// Add modify controller only if the feature gate is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) {
mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, informerFactory,
mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, *extraModifyMetadata, informerFactory,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax))
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestController(t *testing.T) {
disableVolumeInUseErrorHandler: true,
},
} {
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true)
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

var expectedCap resource.Quantity
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestResizePVC(t *testing.T) {
},
} {
t.Run(test.Name, func(t *testing.T) {
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true)
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true, false)
if test.expansionError != nil {
client.SetExpansionError(test.expansionError)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/expand_and_recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestExpandAndRecover(t *testing.T) {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)
client := csi.NewMockClient("foo", !test.disableNodeExpansion, !test.disableControllerExpansion, false, true, true)
client := csi.NewMockClient("foo", !test.disableNodeExpansion, !test.disableControllerExpansion, false, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())
if test.expansionError != nil {
client.SetExpansionError(test.expansionError)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/resize_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestResizeFunctions(t *testing.T) {
tc := test
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)
client := csi.NewMockClient("foo", true, true, false, true, true)
client := csi.NewMockClient("foo", true, true, false, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

pvc := test.pvc
Expand Down
6 changes: 5 additions & 1 deletion pkg/csi/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ func NewMockClient(
supportsControllerResize bool,
supportsControllerModify bool,
supportsPluginControllerService bool,
supportsControllerSingleNodeMultiWriter bool) *MockClient {
supportsControllerSingleNodeMultiWriter bool,
supportsExtraModifyMetada bool,
) *MockClient {
return &MockClient{
name: name,
supportsNodeResize: supportsNodeResize,
supportsControllerResize: supportsControllerResize,
supportsControllerModify: supportsControllerModify,
supportsPluginControllerService: supportsPluginControllerService,
supportsControllerSingleNodeMultiWriter: supportsControllerSingleNodeMultiWriter,
extraModifyMetadata: supportsExtraModifyMetada,
}
}

Expand All @@ -40,6 +43,7 @@ type MockClient struct {
checkMigratedLabel bool
usedSecrets atomic.Pointer[map[string]string]
usedCapability atomic.Pointer[csi.VolumeCapability]
extraModifyMetadata bool
}

func (c *MockClient) GetDriverName(context.Context) (string, error) {
Expand Down
15 changes: 9 additions & 6 deletions pkg/modifier/csi_modifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewModifierFromClient(
timeout time.Duration,
k8sClient kubernetes.Interface,
informerFactory informers.SharedInformerFactory,
extraModifyMetadata bool,
driverName string) (Modifier, error) {

_, err := supportsControllerModify(csiClient, timeout)
Expand All @@ -41,18 +42,20 @@ func NewModifierFromClient(
}

return &csiModifier{
name: driverName,
client: csiClient,
timeout: timeout,
name: driverName,
client: csiClient,
timeout: timeout,
extraModifyMetadata: extraModifyMetadata,

k8sClient: k8sClient,
}, nil
}

type csiModifier struct {
name string
client csi.Client
timeout time.Duration
name string
client csi.Client
timeout time.Duration
extraModifyMetadata bool

k8sClient kubernetes.Interface
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/modifier/csi_modifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func TestNewModifier(t *testing.T) {
SupportsControllerModify: false,
},
} {
client := csi.NewMockClient("mock", false, false, c.SupportsControllerModify, false, false)
client := csi.NewMockClient("mock", false, false, c.SupportsControllerModify, false, false, false)
driverName := "mock-driver"
k8sClient, informerFactory := fakeK8s()
_, err := NewModifierFromClient(client, 0, k8sClient, informerFactory, driverName)
_, err := NewModifierFromClient(client, 0, k8sClient, informerFactory, false, driverName)
if err != c.Error {
t.Errorf("Case %d: Unexpected error: wanted %v, got %v", i, c.Error, err)
}
Expand Down
47 changes: 25 additions & 22 deletions pkg/modifycontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,18 @@ type ModifyController interface {
}

type modifyController struct {
name string
modifier modifier.Modifier
kubeClient kubernetes.Interface
claimQueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
vacLister storagev1beta1listers.VolumeAttributesClassLister
vacListerSynced cache.InformerSynced
name string
modifier modifier.Modifier
kubeClient kubernetes.Interface
claimQueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
vacLister storagev1beta1listers.VolumeAttributesClassLister
vacListerSynced cache.InformerSynced
extraModifyMetadata bool
// the key of the map is {PVC_NAMESPACE}/{PVC_NAME}
uncertainPVCs map[string]v1.PersistentVolumeClaim
}
Expand All @@ -68,6 +69,7 @@ func NewModifyController(
modifier modifier.Modifier,
kubeClient kubernetes.Interface,
resyncPeriod time.Duration,
extraModifyMetadata bool,
informerFactory informers.SharedInformerFactory,
pvcRateLimiter workqueue.RateLimiter) ModifyController {
pvInformer := informerFactory.Core().V1().PersistentVolumes()
Expand All @@ -83,17 +85,18 @@ func NewModifyController(
pvcRateLimiter, fmt.Sprintf("%s-pvc", name))

ctrl := &modifyController{
name: name,
modifier: modifier,
kubeClient: kubeClient,
pvListerSynced: pvInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvcListerSynced: pvcInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
vacListerSynced: vacInformer.Informer().HasSynced,
vacLister: vacInformer.Lister(),
claimQueue: claimQueue,
eventRecorder: eventRecorder,
name: name,
modifier: modifier,
kubeClient: kubeClient,
pvListerSynced: pvInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvcListerSynced: pvcInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
vacListerSynced: vacInformer.Informer().HasSynced,
vacLister: vacInformer.Lister(),
claimQueue: claimQueue,
eventRecorder: eventRecorder,
extraModifyMetadata: extraModifyMetadata,
}
// Add a resync period as the PVC's request modify can be modified again when we handling
// a previous modify request of the same PVC.
Expand Down
12 changes: 6 additions & 6 deletions pkg/modifycontroller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestController(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Setup
client := csi.NewMockClient("foo", true, true, true, true, true)
client := csi.NewMockClient("foo", true, true, true, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

var initialObjects []runtime.Object
Expand All @@ -78,15 +78,15 @@ func TestController(t *testing.T) {
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, driverName)
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
}

featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, informerFactory,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestModifyPVC(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := csi.NewMockClient("mock", true, true, true, true, true)
client := csi.NewMockClient("mock", true, true, true, true, true, false)
if test.modifyFailure {
client.SetModifyFailed()
}
Expand All @@ -179,15 +179,15 @@ func TestModifyPVC(t *testing.T) {
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, driverName)
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
}

featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, informerFactory,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())

ctrlInstance, _ := controller.(*modifyController)
Expand Down
24 changes: 12 additions & 12 deletions pkg/modifycontroller/modify_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) {
tc := test
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
client := csi.NewMockClient("foo", true, true, true, true, true)
client := csi.NewMockClient("foo", true, true, true, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

pvc := test.pvc
Expand All @@ -112,13 +112,13 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) {

kubeClient, informerFactory := fakeK8s(initialObjects)

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, driverName)
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, informerFactory,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestUpdateConditionBasedOnError(t *testing.T) {
tc := test
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
client := csi.NewMockClient("foo", true, true, true, true, true)
client := csi.NewMockClient("foo", true, true, true, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

pvc := test.pvc
Expand All @@ -172,13 +172,13 @@ func TestUpdateConditionBasedOnError(t *testing.T) {

kubeClient, informerFactory := fakeK8s(initialObjects)

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, driverName)
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, informerFactory,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) {
tc := test
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
client := csi.NewMockClient("foo", true, true, true, true, true)
client := csi.NewMockClient("foo", true, true, true, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

var initialObjects []runtime.Object
Expand All @@ -240,13 +240,13 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) {

kubeClient, informerFactory := fakeK8s(initialObjects)

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, driverName)
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, informerFactory,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())

ctrlInstance, _ := controller.(*modifyController)
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {
tc := test
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
client := csi.NewMockClient("foo", true, true, true, true, true)
client := csi.NewMockClient("foo", true, true, true, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

var initialObjects []runtime.Object
Expand All @@ -306,13 +306,13 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {
podInformer := informerFactory.Core().V1().Pods()
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, driverName)
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, informerFactory,
time.Second, false, informerFactory,
workqueue.DefaultControllerRateLimiter())

ctrlInstance, _ := controller.(*modifyController)
Expand Down
11 changes: 11 additions & 0 deletions pkg/modifycontroller/modify_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import (
"k8s.io/klog/v2"
)

const (
pvcNameKey = "csi.storage.k8s.io/pvc/name"
pvcNamespaceKey = "csi.storage.k8s.io/pvc/namespace"
pvNameKey = "csi.storage.k8s.io/pv/name"
)

// The return value bool is only used as a sentinel value when function returns without actually performing modification
func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error, bool) {
pvcSpecVacName := pvc.Spec.VolumeAttributesClassName
Expand Down Expand Up @@ -148,6 +154,11 @@ func (ctrl *modifyController) callModifyVolumeOnPlugin(
pvc *v1.PersistentVolumeClaim,
pv *v1.PersistentVolume,
vac *storagev1beta1.VolumeAttributesClass) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error) {
if ctrl.extraModifyMetadata {
vac.Parameters[pvcNameKey] = pvc.GetName()
vac.Parameters[pvcNamespaceKey] = pvc.GetNamespace()
vac.Parameters[pvNameKey] = pv.GetName()
}
err := ctrl.modifier.Modify(pv, vac.Parameters)

if err != nil {
Expand Down
Loading

0 comments on commit 7461a0d

Please sign in to comment.