Skip to content

Commit

Permalink
fix(cleanup): add cleanup of volumes from node to monitor func (#142)
Browse files Browse the repository at this point in the history
Signed-off-by: Payes Anand <[email protected]>
  • Loading branch information
payes authored Jan 11, 2021
1 parent adaaa44 commit 313ddbc
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 36 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gogo/protobuf v1.3.0 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.8 // indirect
Expand Down
14 changes: 12 additions & 2 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -206,13 +207,13 @@ func (ns *node) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

logrus.Info("NodeStageVolume: start format and mount operation")
logrus.Infof("NodeStageVolume %v: start format and mount operation", volumeID)
if err := ns.formatAndMount(req, devicePath); err != nil {
vol.Finalizers = nil
// There might still be a case that the attach was successful,
// therefore not cleaning up the staging path from CR
if _, uerr := utils.UpdateCStorVolumeAttachmentCR(vol); uerr != nil {
logrus.Errorf("Failed to update CStorVolumeAttachment:%v", uerr.Error())
logrus.Errorf("Failed to update cva for %s: %v", volumeID, uerr.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -250,8 +251,13 @@ func (ns *node) NodeUnstageVolume(
defer removeVolumeFromTransitionList(volumeID)

if vol, err = utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV); err != nil {
if k8serror.IsNotFound(err) {
logrus.Infof("cva for %s has already been deleted", volumeID)
return &csi.NodeUnstageVolumeResponse{}, nil
}
return nil, status.Error(codes.Internal, err.Error())
}

if vol.Spec.Volume.StagingTargetPath == "" {
return &csi.NodeUnstageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -359,6 +365,10 @@ func (ns *node) NodeUnpublishVolume(
}
vol, err := utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV)
if err != nil {
if k8serror.IsNotFound(err) {
logrus.Infof("cva for %s has already been deleted", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
return nil, status.Error(codes.Internal, err.Error())
}
vol.Spec.Volume.TargetPath = ""
Expand Down
8 changes: 2 additions & 6 deletions pkg/driver/node_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,6 @@ func (ns *node) prepareVolumeForNode(
volumeID := req.GetVolumeId()
nodeID := ns.driver.config.NodeID

if err := utils.PatchCVCNodeID(volumeID, nodeID); err != nil {
return err
}

labels := map[string]string{
"nodeID": nodeID,
"Volname": volumeID,
Expand Down Expand Up @@ -356,7 +352,7 @@ func (ns *node) prepareVolumeForNode(
} else if !isCVCBound {
utils.TransitionVolList[volumeID] = apis.CStorVolumeAttachmentStatusWaitingForCVCBound
time.Sleep(10 * time.Second)
return errors.New("Waiting for CVC to be bound")
return errors.Errorf("Waiting for %s's CVC to be bound", volumeID)
}

if err = utils.FetchAndUpdateISCSIDetails(volumeID, vol); err != nil {
Expand All @@ -368,7 +364,7 @@ func (ns *node) prepareVolumeForNode(
return err
} else if err == nil && oldvol != nil {
if oldvol.DeletionTimestamp != nil {
return errors.Errorf("Volume still mounted on node: %s", nodeID)
return errors.Errorf("Volume %s still mounted on node: %s", volumeID, nodeID)
}
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/driver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func New(config *config.Config) *CSIDriver {
driver.cs = NewController(driver)

case "node":
utils.CleanupOnRestart()
if err := utils.Cleanup(); err != nil {
logrus.Fatalf(err.Error())
}
// Start monitor goroutine to monitor the
// mounted paths. If a path goes down or
// becomes read only (in case of RW mount
Expand Down
4 changes: 2 additions & 2 deletions pkg/driver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func addVolumeToTransitionList(volumeID string, status apis.CStorVolumeAttachmen
defer utils.TransitionVolListLock.Unlock()

if _, ok := utils.TransitionVolList[volumeID]; ok {
return fmt.Errorf("Volume Busy, status: %v",
utils.TransitionVolList[volumeID])
return fmt.Errorf("Volume %s Busy, status: %v",
volumeID, utils.TransitionVolList[volumeID])
}
utils.TransitionVolList[volumeID] = status
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
node "github.com/openebs/cstor-csi/pkg/kubernetes/node"
pv "github.com/openebs/cstor-csi/pkg/kubernetes/persistentvolume"
errors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -151,6 +152,7 @@ func DeleteOldCStorVolumeAttachmentCRs(volumeID string) error {
}

for _, csivol := range csivols.Items {
logrus.Infof("Marking cva %s for deletion", csivol.Name)
err = csivolume.NewKubeclient().
WithNamespace(OpenEBSNamespace).Delete(csivol.Name)
if err != nil {
Expand All @@ -165,6 +167,7 @@ func DeleteOldCStorVolumeAttachmentCRs(volumeID string) error {

// DeleteCStorVolumeAttachmentCR removes the CStorVolumeAttachmentCR for the specified path
func DeleteCStorVolumeAttachmentCR(csivolName string) error {
logrus.Infof("Deleting cva %s", csivolName)
return csivolume.NewKubeclient().
WithNamespace(OpenEBSNamespace).Delete(csivolName)
}
81 changes: 56 additions & 25 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"golang.org/x/net/context"

"github.com/google/uuid"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
apis "github.com/openebs/cstor-csi/pkg/apis/cstor/v1"
"github.com/openebs/cstor-csi/pkg/cstor/snapshot"
Expand Down Expand Up @@ -116,13 +117,27 @@ func logGRPC(
ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
logrus.Debugf("GRPC call: %s", info.FullMethod)
logrus.Debugf("GRPC request: %s", protosanitizer.StripSecrets(req))

var printlog bool
id := uuid.New()
if strings.Contains(info.FullMethod, "NodeGetCapabilities") ||
strings.Contains(info.FullMethod, "NodeGetVolumeStats") ||
strings.Contains(info.FullMethod, "ControllerGetCapabilities") ||
strings.Contains(info.FullMethod, "GetPluginInfo") ||
strings.Contains(info.FullMethod, "GetPluginCapabilities") ||
strings.Contains(info.FullMethod, "Probe") {
printlog = true
}
if !printlog {
logrus.Infof("Req %s: %s %s", id, info.FullMethod, protosanitizer.StripSecrets(req))
}
resp, err := handler(ctx, req)
if err != nil {
logrus.Errorf("GRPC error: %v", err)
} else {
logrus.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
if !printlog {
logrus.Infof("Resp %s: %s", id, protosanitizer.StripSecrets(resp))
}
}
return resp, err
}
Expand All @@ -135,7 +150,7 @@ func ChmodMountPath(mountPath string) error {

// WaitForVolumeToBeReachable keeps the mounts on hold until the volume is
// reachable
func WaitForVolumeToBeReachable(targetPortal string) error {
func WaitForVolumeToBeReachable(volumeID string, targetPortal string) error {
var (
retries int
err error
Expand All @@ -146,7 +161,7 @@ func WaitForVolumeToBeReachable(targetPortal string) error {
// Create a connection to test if the iSCSI Portal is reachable,
if conn, err = net.Dial("tcp", targetPortal); err == nil {
conn.Close()
logrus.Infof("Volume is reachable to create connections")
logrus.Infof("Volume %s is reachable to create connections", volumeID)
return nil
}
// wait until the iSCSI targetPortal is reachable
Expand All @@ -160,8 +175,8 @@ func WaitForVolumeToBeReachable(targetPortal string) error {
// based on the kubelets retrying logic. Kubelet retries to publish
// volume after every 14s )
return fmt.Errorf(
"iSCSI Target not reachable, TargetPortal %v, err:%v",
targetPortal, err)
"iSCSI Target not reachable for %s, TargetPortal %v, err:%v",
volumeID, targetPortal, err)
}
}
}
Expand All @@ -177,14 +192,15 @@ checkVolumeStatus:
return err
} else if volStatus == "Healthy" || volStatus == "Degraded" {
// In both healthy and degraded states the volume can serve IOs
logrus.Infof("Volume is ready to accept IOs")
logrus.Infof("Volume %s is ready to accept IOs", volumeID)
} else if retries >= VolumeWaitRetryCount {
// Let the caller function decide further if the volume is still not
// ready to accdept IOs after 12 seconds ( This number was arrived at
// based on the kubelets retrying logic. Kubelet retries to publish
// volume after every 14s )
return fmt.Errorf(
"Volume is not ready: Replicas yet to connect to controller",
"Volume %s is not ready: Replicas yet to connect to controller",
volumeID,
)
} else {
TransitionVolList[volumeID] = apis.CStorVolumeAttachmentStatusWaitingForVolumeToBeReady
Expand Down Expand Up @@ -238,6 +254,7 @@ func MonitorMounts() {
mounter := mount.New("")
ticker := time.NewTicker(MonitorMountRetryTimeout * time.Second)
for {
cleanupRequired := false
select {
case <-ticker.C:
// Get list of mounted paths present with the node
Expand All @@ -253,6 +270,7 @@ func MonitorMounts() {
for _, vol := range csivolList.Items {
// ignore monitoring for volumes with deletion timestamp set
if vol.DeletionTimestamp != nil {
cleanupRequired = true
continue
}
// ignore monitoring the mount for a block device
Expand Down Expand Up @@ -318,22 +336,33 @@ func MonitorMounts() {
}
TransitionVolListLock.Unlock()
}
if cleanupRequired {
Cleanup()
}
}
}

// CleanupOnRestart unmounts and detaches the volumes having
// Cleanup unmounts and detaches the volumes having
// DeletionTimestamp set and removes finalizers from the
// corresponding CStorVolumeAttachment CRs
func CleanupOnRestart() {
func Cleanup() (err error) {
var (
err error
csivolList *apis.CStorVolumeAttachmentList
)
// Get list of mounted paths present with the node
TransitionVolListLock.Lock()
defer TransitionVolListLock.Unlock()
if csivolList, err = GetVolListForNode(); err != nil {
return
count := 0
for {
if csivolList, err = GetVolListForNode(); err == nil {
break
}
time.Sleep(time.Second)
count++
if count == 5 {
logrus.Errorf("Failed to get cva list, err: %v", err)
return
}
}
for _, Vol := range csivolList.Items {
if Vol.DeletionTimestamp == nil {
Expand All @@ -344,8 +373,10 @@ func CleanupOnRestart() {
// This is being run in a go routine so that if unmount and detach
// commands take time, the startup is not delayed
go func(vol *apis.CStorVolumeAttachment) {
logrus.Infof("Cleaning up %s from node", vol.Spec.Volume.Name)
if err := iscsiutils.UnmountAndDetachDisk(vol, vol.Spec.Volume.StagingTargetPath); err == nil {
vol.Finalizers = nil
logrus.Infof("Cleaning up cva %s", vol.Name)
if vol, err = UpdateCStorVolumeAttachmentCR(vol); err != nil {
logrus.Errorf(err.Error())
}
Expand All @@ -354,16 +385,16 @@ func CleanupOnRestart() {
}

TransitionVolListLock.Lock()
TransitionVolList[vol.Spec.Volume.Name] = apis.CStorVolumeAttachmentStatusUnmounted
delete(TransitionVolList, vol.Spec.Volume.Name)
TransitionVolListLock.Unlock()
}(&vol)
}
return
}

// IsVolumeReachable makes a TCP connection to target
// and checks if volume is Reachable
func IsVolumeReachable(targetPortal string) (bool, error) {
func IsVolumeReachable(volumeID, targetPortal string) (bool, error) {
var (
err error
conn net.Conn
Expand All @@ -372,12 +403,12 @@ func IsVolumeReachable(targetPortal string) (bool, error) {
// Create a connection to test if the iSCSI Portal is reachable,
if conn, err = net.Dial("tcp", targetPortal); err == nil {
conn.Close()
logrus.Infof("Volume is reachable to create connections")
logrus.Infof("Volume %s is reachable to create connections", volumeID)
return true, nil
}
logrus.Infof(
"iSCSI Target not reachable, TargetPortal %v, err:%v",
targetPortal, err,
"iSCSI Target not reachable, VolumeID: %s TargetPortal %v, err:%v",
volumeID, targetPortal, err,
)
return false, err
}
Expand All @@ -391,10 +422,10 @@ func IsVolumeReady(volumeID string) (bool, error) {
}
if volStatus == "Healthy" || volStatus == "Degraded" {
// In both healthy and degraded states the volume can serve IOs
logrus.Infof("Volume is ready to accept IOs")
logrus.Infof("Volume %s is ready to accept IOs", volumeID)
return true, nil
}
logrus.Infof("Volume is not ready: Replicas yet to connect to controller")
logrus.Infof("Volume %s is not ready: Replicas yet to connect to controller", volumeID)
return false, nil
}

Expand All @@ -410,7 +441,7 @@ func WaitForVolumeReadyAndReachable(vol *apis.CStorVolumeAttachment) error {
return err
}
// This function return after 12s in case the volume is not reachable
err = WaitForVolumeToBeReachable(vol.Spec.ISCSI.TargetPortal)
err = WaitForVolumeToBeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal)
if err != nil {
logrus.Error(err)
return err
Expand Down Expand Up @@ -438,10 +469,10 @@ func RemountVolume(
options := []string{"rw"}

if ready, err := IsVolumeReady(vol.Spec.Volume.Name); err != nil || !ready {
return fmt.Errorf("Volume is not ready")
return fmt.Errorf("Volume %s is not ready", vol.Spec.Volume.Name)
}
if reachable, err := IsVolumeReachable(vol.Spec.ISCSI.TargetPortal); err != nil || !reachable {
return fmt.Errorf("Volume is not reachable")
if reachable, err := IsVolumeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal); err != nil || !reachable {
return fmt.Errorf("Volume %s is not reachable", vol.Spec.Volume.Name)
}
if stagingPathExists {
mounter.Unmount(vol.Spec.Volume.StagingTargetPath)
Expand Down

0 comments on commit 313ddbc

Please sign in to comment.