Skip to content

Commit

Permalink
Fixes ReconcileVA for non-migratable drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
yuga711 committed Aug 12, 2020
1 parent 298cb96 commit 869dc86
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 40 deletions.
28 changes: 26 additions & 2 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,19 @@ func (h *csiHandler) ReconcileVA() error {
}
attachedStatus := va.Status.Attached

volumeHandle, err = h.translator.RepairVolumeHandle(source.Driver, volumeHandle, nodeID)
// If volume driver has corresponding in-tree plugin, generate a correct volumehandle
isMig, err := h.isMigratable(va)
if err != nil {
klog.Warningf("Failed to repair volume handle %s for driver %s: %v", volumeHandle, source.Driver, err)
klog.Warningf("Failed to check if migratable: %v", err)
continue
}
if isMig {
volumeHandle, err = h.translator.RepairVolumeHandle(source.Driver, volumeHandle, nodeID)
if err != nil {
klog.Warningf("Failed to repair volume handle %s for driver %s: %v", volumeHandle, source.Driver, err)
continue
}
}

// Check whether the volume is published to this node
found := false
Expand Down Expand Up @@ -354,6 +362,22 @@ func (h *csiHandler) hasVAFinalizer(va *storage.VolumeAttachment) bool {
return false
}

// Checks if the PV (or) the inline-volume corresponding to the VA could have migrated from
// in-tree to CSI.
func (h *csiHandler) isMigratable(va *storage.VolumeAttachment) (bool, error) {
if va.Spec.Source.PersistentVolumeName != nil {
pv, err := h.pvLister.Get(*va.Spec.Source.PersistentVolumeName)
if err != nil {
return false, err
}
return h.translator.IsPVMigratable(pv), nil
} else if va.Spec.Source.InlineVolumeSpec != nil {
return va.Spec.Source.InlineVolumeSpec.CSI != nil, nil
} else {
return false, nil
}
}

func getCSISource(pvSpec *v1.PersistentVolumeSpec) (*v1.CSIPersistentVolumeSource, error) {
if pvSpec == nil {
return nil, errors.New("could not get CSI source, pv spec was nil")
Expand Down
55 changes: 42 additions & 13 deletions pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
core "k8s.io/client-go/testing"
csitranslator "k8s.io/csi-translation-lib"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.Sh
informerFactory.Storage().V1().VolumeAttachments().Lister(),
&timeout,
true, /* supports PUBLISH_READONLY */
fakeInTreeToCSITranslator{},
csitranslator.New(),
)
}

Expand All @@ -76,7 +77,7 @@ func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, informerFactory in
informerFactory.Storage().V1().VolumeAttachments().Lister(),
&timeout,
false, /* does not support PUBLISH_READONLY */
fakeInTreeToCSITranslator{},
csitranslator.New(),
)
}

Expand Down Expand Up @@ -696,7 +697,7 @@ func TestCSIHandler(t *testing.T) {
{
name: "unknown node -> error",
initialObjects: []runtime.Object{pvWithFinalizer()},
addedVA: va(false, fin, ann),
addedVA: va(false, fin, nil),
expectedActions: []core.Action{
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false, fin, ann), vaWithAttachError(va(false, fin, ann),
Expand Down Expand Up @@ -833,8 +834,17 @@ func TestCSIHandler(t *testing.T) {
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, "" /*finalizer*/, nil /* annotations */),
va(false /*attached*/, fin, ann))),
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, fin, ann),
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone,
testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, fin /*finalizer*/, ann /* annotations */),
vaWithAttachError(va(false, fin, ann), "context deadline exceeded")),
"status"),
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, "" /*finalizer*/, nil /* annotations */),
va(false /*attached*/, fin, ann))),
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone,
testPVName+"-"+testNodeName,
types.MergePatchType, patch(vaWithAttachError(va(false, fin, ann), "context deadline exceeded"),
va(true /*attached*/, fin, ann)), "status"),
},
expectedCSICalls: []csiCall{
Expand All @@ -845,7 +855,7 @@ func TestCSIHandler(t *testing.T) {
{
name: "Node without CSINode -> error",
initialObjects: []runtime.Object{pvWithFinalizer(), node()},
addedVA: va(false, fin, ann),
addedVA: va(false, fin, nil),
expectedActions: []core.Action{
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, fin, ann),
Expand All @@ -855,7 +865,7 @@ func TestCSIHandler(t *testing.T) {
{
name: "Node with annotations, CSINode is absent -> error",
initialObjects: []runtime.Object{pvWithFinalizer(), nodeWithAnnotations()},
addedVA: va(false, fin, ann),
addedVA: va(false, fin, nil),
expectedActions: []core.Action{
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, fin, ann),
Expand Down Expand Up @@ -1059,6 +1069,10 @@ func TestCSIHandler(t *testing.T) {
initialObjects: []runtime.Object{pvWithFinalizer(), csiNode()},
addedVA: deleted(va(true, fin, ann)),
expectedActions: []core.Action{
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone,
testPVName+"-"+testNodeName,
types.MergePatchType, patch(deleted(va(true, "", ann)),
deleted(vaWithDetachError(va(true, "", ann), "context deadline exceeded"))), "status"),
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(deleted(va(true, "", ann)),
deleted(va(false /*attached*/, "", ann))), "status"),
Expand Down Expand Up @@ -1381,19 +1395,33 @@ func TestCSIHandler(t *testing.T) {

func TestCSIHandlerReconcileVA(t *testing.T) {
nID := map[string]string{
vaNodeIDAnnotation: testNodeName,
vaNodeIDAnnotation: testNodeID,
}
vaGroupResourceVersion := schema.GroupVersionResource{
Group: storage.GroupName,
Version: "v1",
Resource: "volumeattachments",
}

tests := []testCase{
{
name: "va attached actual state not attached",
initialObjects: []runtime.Object{
va(true /*attached*/, "" /*finalizer*/, nID /*annotations*/),
pvWithFinalizer(),
csiNode(),
},
listerResponse: map[string][]string{
// Intentionally empty
},
expectedActions: []core.Action{
// Finalizer is saved first
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, "" /*finalizer*/, nil /* annotations */),
va(false /*attached*/, fin, nil))),
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, "", nil),
va(false, "", nil)), "status"),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, nil, nil, false, nil, false, nil, 0},
},
Expand All @@ -1405,7 +1433,7 @@ func TestCSIHandlerReconcileVA(t *testing.T) {
pvWithFinalizer(),
},
listerResponse: map[string][]string{
testVolumeHandle: []string{testNodeName},
testVolumeHandle: []string{testNodeID},
},
expectedActions: []core.Action{
// Intentionally empty
Expand All @@ -1414,11 +1442,12 @@ func TestCSIHandlerReconcileVA(t *testing.T) {
{
name: "va not attached actual state attached",
initialObjects: []runtime.Object{
va(false /*attached*/, "" /*finalizer*/, nID /*annotations*/),
deleted(va(false /*attached*/, "" /*finalizer*/, nID /*annotations*/)),
// va(false /*attached*/, "" /*finalizer*/, nID /*annotations*/),
pvWithFinalizer(),
},
listerResponse: map[string][]string{
testVolumeHandle: []string{testNodeName},
testVolumeHandle: []string{testNodeID},
},
expectedActions: []core.Action{},
expectedCSICalls: []csiCall{
Expand All @@ -1429,7 +1458,7 @@ func TestCSIHandlerReconcileVA(t *testing.T) {
name: "no volume attachments but existing lister response results in no action",
initialObjects: []runtime.Object{},
listerResponse: map[string][]string{
testVolumeHandle: []string{testNodeName},
testVolumeHandle: []string{testNodeID},
},
expectedActions: []core.Action{},
},
Expand Down
71 changes: 46 additions & 25 deletions pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -178,8 +177,8 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
}

// Construct controller
csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls}
lister := &fakeLister{t: t, publishedNodes: test.listerResponse}
csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls, lister: lister}
handler := handlerFactory(client, informers, csiConnection, lister)
ctrl := NewCSIAttachController(client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter(), test.listerResponse != nil, 1*time.Minute)

Expand All @@ -200,6 +199,7 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
// Process the queue until we get expected results
timeout := time.Now().Add(10 * time.Second)
lastReportedActionCount := 0
firstIter := false
for {
if time.Now().After(timeout) {
t.Errorf("Test %q: timed out", test.name)
Expand All @@ -224,6 +224,11 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
t.Errorf("Failed to reconcile Volume Attachment objects: %v", err)
}
}
if !firstIter && (ctrl.vaQueue.Len() > 0 || ctrl.pvQueue.Len() > 0) {
// Reconciler created some work, process the queues once again
firstIter = true
continue
}
currentActionCount := len(client.Actions())
if currentActionCount < len(test.expectedActions) {
if lastReportedActionCount < currentActionCount {
Expand Down Expand Up @@ -297,6 +302,13 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
if test.additionalCheck != nil {
test.additionalCheck(t, test)
}
// makesure all the csi calls were executed.
if csiConnection.index < len(csiConnection.calls) {
t.Errorf("Test %q: %d additional expected CSI calls", test.name, len(csiConnection.calls)-csiConnection.index)
for _, a := range csiConnection.calls[csiConnection.index:] {
t.Logf(" %+v", a)
}
}
klog.Infof("Test %q: finished \n\n", test.name)
}
}
Expand Down Expand Up @@ -400,12 +412,25 @@ func (l *fakeLister) ListVolumes(ctx context.Context) (map[string][]string, erro
return l.publishedNodes, nil
}

func (l *fakeLister) Add(volumeHandle string, nodeID string) {
if l.publishedNodes != nil {
l.publishedNodes[volumeHandle] = []string{nodeID}
}
}

func (l *fakeLister) Delete(volumeHandle string, nodeID string) {
if l.publishedNodes != nil {
delete(l.publishedNodes, volumeHandle)
}
}

// Fake CSIConnection implementation that check that Attach/Detach is called
// with the right parameters and it returns proper error code and metadata.
type fakeCSIConnection struct {
calls []csiCall
index int
t *testing.T
calls []csiCall
index int
lister *fakeLister
t *testing.T
}

func (f *fakeCSIConnection) GetDriverName(ctx context.Context) (string, error) {
Expand All @@ -429,9 +454,12 @@ func (f *fakeCSIConnection) Attach(ctx context.Context, volumeID string, readOnl
call := f.calls[f.index]
f.index++

// Force a delay
if call.delay != time.Duration(0) {
time.Sleep(call.delay)
// If caller has set long delay, return when deadline expires
select {
case <-ctx.Done():
return nil, true, ctx.Err()
case <-time.After(call.delay):
break
}

var err error
Expand Down Expand Up @@ -465,6 +493,8 @@ func (f *fakeCSIConnection) Attach(ctx context.Context, volumeID string, readOnl
if err != nil {
return nil, true, err
}
// Update the published volume map
f.lister.Add(call.volumeHandle, call.nodeID)
return call.metadata, call.detached, call.err
}

Expand All @@ -476,9 +506,12 @@ func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID
call := f.calls[f.index]
f.index++

// Force a delay
if call.delay != time.Duration(0) {
time.Sleep(call.delay)
// If caller has set long delay, return when deadline expires
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(call.delay):
break
}

var err error
Expand All @@ -504,6 +537,8 @@ func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID
if err != nil {
return err
}
// Update the published volume map
f.lister.Delete(call.volumeHandle, call.nodeID)
return call.err
}

Expand All @@ -514,17 +549,3 @@ func (f *fakeCSIConnection) Close() error {
func (f *fakeCSIConnection) Probe(timeout time.Duration) error {
return nil
}

// TODO: Remove hardcoding for GCE tests and make more general
type fakeInTreeToCSITranslator struct{}

func (f fakeInTreeToCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
t := csitrans.New()
return t.TranslateInTreePVToCSI(pv)
}
func (f fakeInTreeToCSITranslator) IsPVMigratable(pv *v1.PersistentVolume) bool {
return pv.Spec.GCEPersistentDisk != nil
}
func (f fakeInTreeToCSITranslator) RepairVolumeHandle(pluginName, volumeHandle, nodeID string) (string, error) {
return volumeHandle, nil
}

0 comments on commit 869dc86

Please sign in to comment.