-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(cleanup): add cleanup of volumes from node to monitor func #142
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ import ( | |
"golang.org/x/sys/unix" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
k8serror "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
|
@@ -206,13 +207,13 @@ func (ns *node) NodeStageVolume( | |
return nil, status.Error(codes.Internal, err.Error()) | ||
} | ||
|
||
logrus.Info("NodeStageVolume: start format and mount operation") | ||
logrus.Infof("NodeStageVolume %v: start format and mount operation", volumeID) | ||
if err := ns.formatAndMount(req, devicePath); err != nil { | ||
vol.Finalizers = nil | ||
// There might still be a case that the attach was successful, | ||
// therefore not cleaning up the staging path from CR | ||
if _, uerr := utils.UpdateCStorVolumeAttachmentCR(vol); uerr != nil { | ||
logrus.Errorf("Failed to update CStorVolumeAttachment:%v", uerr.Error()) | ||
logrus.Errorf("Failed to update CStorVolumeAttachment for %s: %v", volumeID, uerr.Error()) | ||
} | ||
return nil, status.Error(codes.Internal, err.Error()) | ||
} | ||
|
@@ -250,8 +251,12 @@ func (ns *node) NodeUnstageVolume( | |
defer removeVolumeFromTransitionList(volumeID) | ||
|
||
if vol, err = utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV); err != nil { | ||
if k8serror.IsNotFound(err) { | ||
return &csi.NodeUnstageVolumeResponse{}, nil | ||
} | ||
return nil, status.Error(codes.Internal, err.Error()) | ||
} | ||
|
||
if vol.Spec.Volume.StagingTargetPath == "" { | ||
return &csi.NodeUnstageVolumeResponse{}, nil | ||
} | ||
|
@@ -359,6 +364,9 @@ func (ns *node) NodeUnpublishVolume( | |
} | ||
vol, err := utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV) | ||
if err != nil { | ||
if k8serror.IsNotFound(err) { | ||
return &csi.NodeUnpublishVolumeResponse{}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have log here saying cstorvolumeattachement not found
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, I will add this log. Besides I will also add logs while creating and deleting CVAs |
||
} | ||
return nil, status.Error(codes.Internal, err.Error()) | ||
} | ||
vol.Spec.Volume.TargetPath = "" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import ( | |
|
||
"golang.org/x/net/context" | ||
|
||
"github.com/google/uuid" | ||
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer" | ||
apis "github.com/openebs/cstor-csi/pkg/apis/cstor/v1" | ||
"github.com/openebs/cstor-csi/pkg/cstor/snapshot" | ||
|
@@ -116,13 +117,27 @@ func logGRPC( | |
ctx context.Context, req interface{}, | ||
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, | ||
) (interface{}, error) { | ||
logrus.Debugf("GRPC call: %s", info.FullMethod) | ||
logrus.Debugf("GRPC request: %s", protosanitizer.StripSecrets(req)) | ||
|
||
var printlog bool | ||
id := uuid.New() | ||
if strings.Contains(info.FullMethod, "NodeGetCapabilities") || | ||
strings.Contains(info.FullMethod, "NodeGetVolumeStats") || | ||
strings.Contains(info.FullMethod, "ControllerGetCapabilities") || | ||
strings.Contains(info.FullMethod, "GetPluginInfo") || | ||
strings.Contains(info.FullMethod, "GetPluginCapabilities") || | ||
strings.Contains(info.FullMethod, "Probe") { | ||
printlog = true | ||
} | ||
if !printlog { | ||
logrus.Infof("Req %s: %s %s", id, info.FullMethod, protosanitizer.StripSecrets(req)) | ||
} | ||
resp, err := handler(ctx, req) | ||
if err != nil { | ||
logrus.Errorf("GRPC error: %v", err) | ||
} else { | ||
logrus.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp)) | ||
if !printlog { | ||
logrus.Infof("Resp %s: %s", id, protosanitizer.StripSecrets(resp)) | ||
} | ||
} | ||
return resp, err | ||
} | ||
|
@@ -135,7 +150,7 @@ func ChmodMountPath(mountPath string) error { | |
|
||
// WaitForVolumeToBeReachable keeps the mounts on hold until the volume is | ||
// reachable | ||
func WaitForVolumeToBeReachable(targetPortal string) error { | ||
func WaitForVolumeToBeReachable(volumeID string, targetPortal string) error { | ||
var ( | ||
retries int | ||
err error | ||
|
@@ -146,7 +161,7 @@ func WaitForVolumeToBeReachable(targetPortal string) error { | |
// Create a connection to test if the iSCSI Portal is reachable, | ||
if conn, err = net.Dial("tcp", targetPortal); err == nil { | ||
conn.Close() | ||
logrus.Infof("Volume is reachable to create connections") | ||
logrus.Infof("Volume %s is reachable to create connections", volumeID) | ||
return nil | ||
} | ||
// wait until the iSCSI targetPortal is reachable | ||
|
@@ -160,8 +175,8 @@ func WaitForVolumeToBeReachable(targetPortal string) error { | |
// based on the kubelets retrying logic. Kubelet retries to publish | ||
// volume after every 14s ) | ||
return fmt.Errorf( | ||
"iSCSI Target not reachable, TargetPortal %v, err:%v", | ||
targetPortal, err) | ||
"iSCSI Target not reachable for %s, TargetPortal %v, err:%v", | ||
volumeID, targetPortal, err) | ||
} | ||
} | ||
} | ||
|
@@ -177,14 +192,15 @@ checkVolumeStatus: | |
return err | ||
} else if volStatus == "Healthy" || volStatus == "Degraded" { | ||
// In both healthy and degraded states the volume can serve IOs | ||
logrus.Infof("Volume is ready to accept IOs") | ||
logrus.Infof("Volume %s is ready to accept IOs", volumeID) | ||
} else if retries >= VolumeWaitRetryCount { | ||
// Let the caller function decide further if the volume is still not | ||
// ready to accdept IOs after 12 seconds ( This number was arrived at | ||
// based on the kubelets retrying logic. Kubelet retries to publish | ||
// volume after every 14s ) | ||
return fmt.Errorf( | ||
"Volume is not ready: Replicas yet to connect to controller", | ||
"Volume %s is not ready: Replicas yet to connect to controller", | ||
volumeID, | ||
) | ||
} else { | ||
TransitionVolList[volumeID] = apis.CStorVolumeAttachmentStatusWaitingForVolumeToBeReady | ||
|
@@ -238,6 +254,7 @@ func MonitorMounts() { | |
mounter := mount.New("") | ||
ticker := time.NewTicker(MonitorMountRetryTimeout * time.Second) | ||
for { | ||
cleanupRequired := false | ||
select { | ||
case <-ticker.C: | ||
// Get list of mounted paths present with the node | ||
|
@@ -253,6 +270,7 @@ func MonitorMounts() { | |
for _, vol := range csivolList.Items { | ||
// ignore monitoring for volumes with deletion timestamp set | ||
if vol.DeletionTimestamp != nil { | ||
cleanupRequired = true | ||
continue | ||
} | ||
// ignore monitoring the mount for a block device | ||
|
@@ -318,22 +336,32 @@ func MonitorMounts() { | |
} | ||
TransitionVolListLock.Unlock() | ||
} | ||
if cleanupRequired { | ||
Cleanup() | ||
} | ||
} | ||
} | ||
|
||
// CleanupOnRestart unmounts and detaches the volumes having | ||
// Cleanup unmounts and detaches the volumes having | ||
// DeletionTimestamp set and removes finalizers from the | ||
// corresponding CStorVolumeAttachment CRs | ||
func CleanupOnRestart() { | ||
func Cleanup() (err error) { | ||
var ( | ||
err error | ||
csivolList *apis.CStorVolumeAttachmentList | ||
) | ||
// Get list of mounted paths present with the node | ||
TransitionVolListLock.Lock() | ||
defer TransitionVolListLock.Unlock() | ||
if csivolList, err = GetVolListForNode(); err != nil { | ||
return | ||
count := 0 | ||
for { | ||
if csivolList, err = GetVolListForNode(); err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this error out more than 5 times then the list will be empty and the error will not be logged as well. Can we log the error outside the for loop in that case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it errors 5 times, we are returning an error to the caller function. If this func returns an error during startup, we will fatal out else we ignore the error. Will add one extra log if it errors 5 times. |
||
break | ||
} | ||
time.Sleep(time.Second) | ||
count++ | ||
if count == 5 { | ||
return | ||
} | ||
} | ||
for _, Vol := range csivolList.Items { | ||
if Vol.DeletionTimestamp == nil { | ||
|
@@ -344,6 +372,7 @@ func CleanupOnRestart() { | |
// This is being run in a go routine so that if unmount and detach | ||
// commands take time, the startup is not delayed | ||
go func(vol *apis.CStorVolumeAttachment) { | ||
logrus.Infof("Cleaning up %s from node", vol.Spec.Volume.Name) | ||
if err := iscsiutils.UnmountAndDetachDisk(vol, vol.Spec.Volume.StagingTargetPath); err == nil { | ||
vol.Finalizers = nil | ||
if vol, err = UpdateCStorVolumeAttachmentCR(vol); err != nil { | ||
|
@@ -354,16 +383,16 @@ func CleanupOnRestart() { | |
} | ||
|
||
TransitionVolListLock.Lock() | ||
TransitionVolList[vol.Spec.Volume.Name] = apis.CStorVolumeAttachmentStatusUnmounted | ||
delete(TransitionVolList, vol.Spec.Volume.Name) | ||
TransitionVolListLock.Unlock() | ||
}(&vol) | ||
} | ||
return | ||
} | ||
|
||
// IsVolumeReachable makes a TCP connection to target | ||
// and checks if volume is Reachable | ||
func IsVolumeReachable(targetPortal string) (bool, error) { | ||
func IsVolumeReachable(volumeID, targetPortal string) (bool, error) { | ||
var ( | ||
err error | ||
conn net.Conn | ||
|
@@ -372,12 +401,12 @@ func IsVolumeReachable(targetPortal string) (bool, error) { | |
// Create a connection to test if the iSCSI Portal is reachable, | ||
if conn, err = net.Dial("tcp", targetPortal); err == nil { | ||
conn.Close() | ||
logrus.Infof("Volume is reachable to create connections") | ||
logrus.Infof("Volume %s is reachable to create connections", volumeID) | ||
return true, nil | ||
} | ||
logrus.Infof( | ||
"iSCSI Target not reachable, TargetPortal %v, err:%v", | ||
targetPortal, err, | ||
"iSCSI Target not reachable, VolumeID: %s TargetPortal %v, err:%v", | ||
volumeID, targetPortal, err, | ||
) | ||
return false, err | ||
} | ||
|
@@ -391,10 +420,10 @@ func IsVolumeReady(volumeID string) (bool, error) { | |
} | ||
if volStatus == "Healthy" || volStatus == "Degraded" { | ||
// In both healthy and degraded states the volume can serve IOs | ||
logrus.Infof("Volume is ready to accept IOs") | ||
logrus.Infof("Volume %s is ready to accept IOs", volumeID) | ||
return true, nil | ||
} | ||
logrus.Infof("Volume is not ready: Replicas yet to connect to controller") | ||
logrus.Infof("Volume %s is not ready: Replicas yet to connect to controller", volumeID) | ||
return false, nil | ||
} | ||
|
||
|
@@ -410,7 +439,7 @@ func WaitForVolumeReadyAndReachable(vol *apis.CStorVolumeAttachment) error { | |
return err | ||
} | ||
// This function return after 12s in case the volume is not reachable | ||
err = WaitForVolumeToBeReachable(vol.Spec.ISCSI.TargetPortal) | ||
err = WaitForVolumeToBeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal) | ||
if err != nil { | ||
logrus.Error(err) | ||
return err | ||
|
@@ -438,10 +467,10 @@ func RemountVolume( | |
options := []string{"rw"} | ||
|
||
if ready, err := IsVolumeReady(vol.Spec.Volume.Name); err != nil || !ready { | ||
return fmt.Errorf("Volume is not ready") | ||
return fmt.Errorf("Volume %s is not ready", vol.Spec.Volume.Name) | ||
} | ||
if reachable, err := IsVolumeReachable(vol.Spec.ISCSI.TargetPortal); err != nil || !reachable { | ||
return fmt.Errorf("Volume is not reachable") | ||
if reachable, err := IsVolumeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal); err != nil || !reachable { | ||
return fmt.Errorf("Volume %s is not reachable", vol.Spec.Volume.Name) | ||
} | ||
if stagingPathExists { | ||
mounter.Unmount(vol.Spec.Volume.StagingTargetPath) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have log here before retrun saying cstorvolumeattachement not found