From a38f38a520778ead3f307114504fc92fd46bca35 Mon Sep 17 00:00:00 2001 From: Payes Anand Date: Mon, 11 Jan 2021 15:03:54 +0530 Subject: [PATCH] fix(cleanup): add cleanup of volumes from node to monitor func (#142) (#143) Signed-off-by: Payes Anand --- go.mod | 1 + pkg/driver/node.go | 14 ++++++- pkg/driver/node_utils.go | 8 +--- pkg/driver/service.go | 4 +- pkg/driver/utils.go | 4 +- pkg/utils/kubernetes.go | 3 ++ pkg/utils/utils.go | 81 +++++++++++++++++++++++++++------------- 7 files changed, 79 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 2e156a6a5..3e62fe2d1 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gogo/protobuf v1.3.0 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/protobuf v1.3.2 + github.com/google/uuid v1.1.1 github.com/googleapis/gnostic v0.3.1 // indirect github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/imdario/mergo v0.3.8 // indirect diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 0f673c2a5..f9c79b16e 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -32,6 +32,7 @@ import ( "golang.org/x/sys/unix" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + k8serror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -206,13 +207,13 @@ func (ns *node) NodeStageVolume( return nil, status.Error(codes.Internal, err.Error()) } - logrus.Info("NodeStageVolume: start format and mount operation") + logrus.Infof("NodeStageVolume %v: start format and mount operation", volumeID) if err := ns.formatAndMount(req, devicePath); err != nil { vol.Finalizers = nil // There might still be a case that the attach was successful, // therefore not cleaning up the staging path from CR if _, uerr := utils.UpdateCStorVolumeAttachmentCR(vol); uerr != nil { - logrus.Errorf("Failed to update CStorVolumeAttachment:%v", uerr.Error()) + logrus.Errorf("Failed to update cva for %s: %v", volumeID, uerr.Error()) } return nil, status.Error(codes.Internal, err.Error()) } @@ -250,8 +251,13 @@ func (ns *node) NodeUnstageVolume( defer removeVolumeFromTransitionList(volumeID) if vol, err = utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV); err != nil { + if k8serror.IsNotFound(err) { + logrus.Infof("cva for %s has already been deleted", volumeID) + return &csi.NodeUnstageVolumeResponse{}, nil + } return nil, status.Error(codes.Internal, err.Error()) } + if vol.Spec.Volume.StagingTargetPath == "" { return &csi.NodeUnstageVolumeResponse{}, nil } @@ -359,6 +365,10 @@ func (ns *node) NodeUnpublishVolume( } vol, err := utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV) if err != nil { + if k8serror.IsNotFound(err) { + logrus.Infof("cva for %s has already been deleted", volumeID) + return &csi.NodeUnpublishVolumeResponse{}, nil + } return nil, status.Error(codes.Internal, err.Error()) } vol.Spec.Volume.TargetPath = "" diff --git a/pkg/driver/node_utils.go b/pkg/driver/node_utils.go index ea2d98697..65c476e06 100644 --- a/pkg/driver/node_utils.go +++ b/pkg/driver/node_utils.go @@ -323,10 +323,6 @@ func (ns *node) prepareVolumeForNode( volumeID := req.GetVolumeId() nodeID := ns.driver.config.NodeID - if err := utils.PatchCVCNodeID(volumeID, nodeID); err != nil { - return err - } - labels := map[string]string{ "nodeID": nodeID, "Volname": volumeID, @@ -356,7 +352,7 @@ func (ns *node) prepareVolumeForNode( } else if !isCVCBound { utils.TransitionVolList[volumeID] = apis.CStorVolumeAttachmentStatusWaitingForCVCBound time.Sleep(10 * time.Second) - return errors.New("Waiting for CVC to be bound") + return errors.Errorf("Waiting for %s's CVC to be bound", volumeID) } if err = utils.FetchAndUpdateISCSIDetails(volumeID, vol); err != nil { @@ -368,7 +364,7 @@ func (ns *node) prepareVolumeForNode( return err } else if err == nil && oldvol != nil { if oldvol.DeletionTimestamp != nil { - return errors.Errorf("Volume still mounted on node: %s", nodeID) + return errors.Errorf("Volume %s still mounted on node: %s", volumeID, nodeID) } return nil } diff --git a/pkg/driver/service.go b/pkg/driver/service.go index 533e1db8c..324639b94 100644 --- a/pkg/driver/service.go +++ b/pkg/driver/service.go @@ -87,7 +87,9 @@ func New(config *config.Config) *CSIDriver { driver.cs = NewController(driver) case "node": - utils.CleanupOnRestart() + if err := utils.Cleanup(); err != nil { + logrus.Fatalf(err.Error()) + } // Start monitor goroutine to monitor the // mounted paths. If a path goes down or // becomes read only (in case of RW mount diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go index 8c9f770e1..80f8e4e15 100644 --- a/pkg/driver/utils.go +++ b/pkg/driver/utils.go @@ -95,8 +95,8 @@ func addVolumeToTransitionList(volumeID string, status apis.CStorVolumeAttachmen defer utils.TransitionVolListLock.Unlock() if _, ok := utils.TransitionVolList[volumeID]; ok { - return fmt.Errorf("Volume Busy, status: %v", - utils.TransitionVolList[volumeID]) + return fmt.Errorf("Volume %s Busy, status: %v", + volumeID, utils.TransitionVolList[volumeID]) } utils.TransitionVolList[volumeID] = status return nil diff --git a/pkg/utils/kubernetes.go b/pkg/utils/kubernetes.go index 7fe1ed58b..75e3ee8ca 100644 --- a/pkg/utils/kubernetes.go +++ b/pkg/utils/kubernetes.go @@ -21,6 +21,7 @@ import ( node "github.com/openebs/cstor-csi/pkg/kubernetes/node" pv "github.com/openebs/cstor-csi/pkg/kubernetes/persistentvolume" errors "github.com/pkg/errors" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -151,6 +152,7 @@ func DeleteOldCStorVolumeAttachmentCRs(volumeID string) error { } for _, csivol := range csivols.Items { + logrus.Infof("Marking cva %s for deletion", csivol.Name) err = csivolume.NewKubeclient(). WithNamespace(OpenEBSNamespace).Delete(csivol.Name) if err != nil { @@ -165,6 +167,7 @@ func DeleteOldCStorVolumeAttachmentCRs(volumeID string) error { // DeleteCStorVolumeAttachmentCR removes the CStorVolumeAttachmentCR for the specified path func DeleteCStorVolumeAttachmentCR(csivolName string) error { + logrus.Infof("Deleting cva %s", csivolName) return csivolume.NewKubeclient(). WithNamespace(OpenEBSNamespace).Delete(csivolName) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 17fe4281d..b1911bd2b 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -25,6 +25,7 @@ import ( "golang.org/x/net/context" + "github.com/google/uuid" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" apis "github.com/openebs/cstor-csi/pkg/apis/cstor/v1" "github.com/openebs/cstor-csi/pkg/cstor/snapshot" @@ -116,13 +117,27 @@ func logGRPC( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { - logrus.Debugf("GRPC call: %s", info.FullMethod) - logrus.Debugf("GRPC request: %s", protosanitizer.StripSecrets(req)) + + var printlog bool + id := uuid.New() + if strings.Contains(info.FullMethod, "NodeGetCapabilities") || + strings.Contains(info.FullMethod, "NodeGetVolumeStats") || + strings.Contains(info.FullMethod, "ControllerGetCapabilities") || + strings.Contains(info.FullMethod, "GetPluginInfo") || + strings.Contains(info.FullMethod, "GetPluginCapabilities") || + strings.Contains(info.FullMethod, "Probe") { + printlog = true + } + if !printlog { + logrus.Infof("Req %s: %s %s", id, info.FullMethod, protosanitizer.StripSecrets(req)) + } resp, err := handler(ctx, req) if err != nil { logrus.Errorf("GRPC error: %v", err) } else { - logrus.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp)) + if !printlog { + logrus.Infof("Resp %s: %s", id, protosanitizer.StripSecrets(resp)) + } } return resp, err } @@ -135,7 +150,7 @@ func ChmodMountPath(mountPath string) error { // WaitForVolumeToBeReachable keeps the mounts on hold until the volume is // reachable -func WaitForVolumeToBeReachable(targetPortal string) error { +func WaitForVolumeToBeReachable(volumeID string, targetPortal string) error { var ( retries int err error @@ -146,7 +161,7 @@ func WaitForVolumeToBeReachable(targetPortal string) error { // Create a connection to test if the iSCSI Portal is reachable, if conn, err = net.Dial("tcp", targetPortal); err == nil { conn.Close() - logrus.Infof("Volume is reachable to create connections") + logrus.Infof("Volume %s is reachable to create connections", volumeID) return nil } // wait until the iSCSI targetPortal is reachable @@ -160,8 +175,8 @@ func WaitForVolumeToBeReachable(targetPortal string) error { // based on the kubelets retrying logic. Kubelet retries to publish // volume after every 14s ) return fmt.Errorf( - "iSCSI Target not reachable, TargetPortal %v, err:%v", - targetPortal, err) + "iSCSI Target not reachable for %s, TargetPortal %v, err:%v", + volumeID, targetPortal, err) } } } @@ -177,14 +192,15 @@ checkVolumeStatus: return err } else if volStatus == "Healthy" || volStatus == "Degraded" { // In both healthy and degraded states the volume can serve IOs - logrus.Infof("Volume is ready to accept IOs") + logrus.Infof("Volume %s is ready to accept IOs", volumeID) } else if retries >= VolumeWaitRetryCount { // Let the caller function decide further if the volume is still not // ready to accdept IOs after 12 seconds ( This number was arrived at // based on the kubelets retrying logic. Kubelet retries to publish // volume after every 14s ) return fmt.Errorf( - "Volume is not ready: Replicas yet to connect to controller", + "Volume %s is not ready: Replicas yet to connect to controller", + volumeID, ) } else { TransitionVolList[volumeID] = apis.CStorVolumeAttachmentStatusWaitingForVolumeToBeReady @@ -238,6 +254,7 @@ func MonitorMounts() { mounter := mount.New("") ticker := time.NewTicker(MonitorMountRetryTimeout * time.Second) for { + cleanupRequired := false select { case <-ticker.C: // Get list of mounted paths present with the node @@ -253,6 +270,7 @@ func MonitorMounts() { for _, vol := range csivolList.Items { // ignore monitoring for volumes with deletion timestamp set if vol.DeletionTimestamp != nil { + cleanupRequired = true continue } // ignore monitoring the mount for a block device @@ -318,22 +336,33 @@ func MonitorMounts() { } TransitionVolListLock.Unlock() } + if cleanupRequired { + Cleanup() + } } } -// CleanupOnRestart unmounts and detaches the volumes having +// Cleanup unmounts and detaches the volumes having // DeletionTimestamp set and removes finalizers from the // corresponding CStorVolumeAttachment CRs -func CleanupOnRestart() { +func Cleanup() (err error) { var ( - err error csivolList *apis.CStorVolumeAttachmentList ) // Get list of mounted paths present with the node TransitionVolListLock.Lock() defer TransitionVolListLock.Unlock() - if csivolList, err = GetVolListForNode(); err != nil { - return + count := 0 + for { + if csivolList, err = GetVolListForNode(); err == nil { + break + } + time.Sleep(time.Second) + count++ + if count == 5 { + logrus.Errorf("Failed to get cva list, err: %v", err) + return + } } for _, Vol := range csivolList.Items { if Vol.DeletionTimestamp == nil { @@ -344,8 +373,10 @@ func CleanupOnRestart() { // This is being run in a go routine so that if unmount and detach // commands take time, the startup is not delayed go func(vol *apis.CStorVolumeAttachment) { + logrus.Infof("Cleaning up %s from node", vol.Spec.Volume.Name) if err := iscsiutils.UnmountAndDetachDisk(vol, vol.Spec.Volume.StagingTargetPath); err == nil { vol.Finalizers = nil + logrus.Infof("Cleaning up cva %s", vol.Name) if vol, err = UpdateCStorVolumeAttachmentCR(vol); err != nil { logrus.Errorf(err.Error()) } @@ -354,16 +385,16 @@ func CleanupOnRestart() { } TransitionVolListLock.Lock() - TransitionVolList[vol.Spec.Volume.Name] = apis.CStorVolumeAttachmentStatusUnmounted delete(TransitionVolList, vol.Spec.Volume.Name) TransitionVolListLock.Unlock() }(&vol) } + return } // IsVolumeReachable makes a TCP connection to target // and checks if volume is Reachable -func IsVolumeReachable(targetPortal string) (bool, error) { +func IsVolumeReachable(volumeID, targetPortal string) (bool, error) { var ( err error conn net.Conn @@ -372,12 +403,12 @@ func IsVolumeReachable(targetPortal string) (bool, error) { // Create a connection to test if the iSCSI Portal is reachable, if conn, err = net.Dial("tcp", targetPortal); err == nil { conn.Close() - logrus.Infof("Volume is reachable to create connections") + logrus.Infof("Volume %s is reachable to create connections", volumeID) return true, nil } logrus.Infof( - "iSCSI Target not reachable, TargetPortal %v, err:%v", - targetPortal, err, + "iSCSI Target not reachable, VolumeID: %s TargetPortal %v, err:%v", + volumeID, targetPortal, err, ) return false, err } @@ -391,10 +422,10 @@ func IsVolumeReady(volumeID string) (bool, error) { } if volStatus == "Healthy" || volStatus == "Degraded" { // In both healthy and degraded states the volume can serve IOs - logrus.Infof("Volume is ready to accept IOs") + logrus.Infof("Volume %s is ready to accept IOs", volumeID) return true, nil } - logrus.Infof("Volume is not ready: Replicas yet to connect to controller") + logrus.Infof("Volume %s is not ready: Replicas yet to connect to controller", volumeID) return false, nil } @@ -410,7 +441,7 @@ func WaitForVolumeReadyAndReachable(vol *apis.CStorVolumeAttachment) error { return err } // This function return after 12s in case the volume is not reachable - err = WaitForVolumeToBeReachable(vol.Spec.ISCSI.TargetPortal) + err = WaitForVolumeToBeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal) if err != nil { logrus.Error(err) return err @@ -438,10 +469,10 @@ func RemountVolume( options := []string{"rw"} if ready, err := IsVolumeReady(vol.Spec.Volume.Name); err != nil || !ready { - return fmt.Errorf("Volume is not ready") + return fmt.Errorf("Volume %s is not ready", vol.Spec.Volume.Name) } - if reachable, err := IsVolumeReachable(vol.Spec.ISCSI.TargetPortal); err != nil || !reachable { - return fmt.Errorf("Volume is not reachable") + if reachable, err := IsVolumeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal); err != nil || !reachable { + return fmt.Errorf("Volume %s is not reachable", vol.Spec.Volume.Name) } if stagingPathExists { mounter.Unmount(vol.Spec.Volume.StagingTargetPath)