Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cleanup): add cleanup of volumes from node to monitor func #142

Merged
merged 4 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gogo/protobuf v1.3.0 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.8 // indirect
Expand Down
14 changes: 12 additions & 2 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -206,13 +207,13 @@ func (ns *node) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

logrus.Info("NodeStageVolume: start format and mount operation")
logrus.Infof("NodeStageVolume %v: start format and mount operation", volumeID)
if err := ns.formatAndMount(req, devicePath); err != nil {
vol.Finalizers = nil
// There might still be a case that the attach was successful,
// therefore not cleaning up the staging path from CR
if _, uerr := utils.UpdateCStorVolumeAttachmentCR(vol); uerr != nil {
logrus.Errorf("Failed to update CStorVolumeAttachment:%v", uerr.Error())
logrus.Errorf("Failed to update cva for %s: %v", volumeID, uerr.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -250,8 +251,13 @@ func (ns *node) NodeUnstageVolume(
defer removeVolumeFromTransitionList(volumeID)

if vol, err = utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV); err != nil {
if k8serror.IsNotFound(err) {
logrus.Infof("cva for %s has already been deleted", volumeID)
return &csi.NodeUnstageVolumeResponse{}, nil
Copy link
Contributor

@prateekpandey14 prateekpandey14 Jan 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have log here before retrun saying cstorvolumeattachement not found

logrus.Infof("CstorVolumeAttachment %s was deleted or cannot be found: %s", volumeID, err.Error())

}
return nil, status.Error(codes.Internal, err.Error())
}

if vol.Spec.Volume.StagingTargetPath == "" {
return &csi.NodeUnstageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -359,6 +365,10 @@ func (ns *node) NodeUnpublishVolume(
}
vol, err := utils.GetCStorVolumeAttachment(volumeID + "-" + utils.NodeIDENV)
if err != nil {
if k8serror.IsNotFound(err) {
logrus.Infof("cva for %s has already been deleted", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have log here saying cstorvolumeattachement not found

logrus.Infof("CstorVolumeAttachment %s was deleted or cannot be found: %s", volumeID, err.Error())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will add this log. Besides I will also add logs while creating and deleting CVAs

}
return nil, status.Error(codes.Internal, err.Error())
}
vol.Spec.Volume.TargetPath = ""
Expand Down
8 changes: 2 additions & 6 deletions pkg/driver/node_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,6 @@ func (ns *node) prepareVolumeForNode(
volumeID := req.GetVolumeId()
nodeID := ns.driver.config.NodeID

if err := utils.PatchCVCNodeID(volumeID, nodeID); err != nil {
return err
}

labels := map[string]string{
"nodeID": nodeID,
"Volname": volumeID,
Expand Down Expand Up @@ -356,7 +352,7 @@ func (ns *node) prepareVolumeForNode(
} else if !isCVCBound {
utils.TransitionVolList[volumeID] = apis.CStorVolumeAttachmentStatusWaitingForCVCBound
time.Sleep(10 * time.Second)
return errors.New("Waiting for CVC to be bound")
return errors.Errorf("Waiting for %s's CVC to be bound", volumeID)
}

if err = utils.FetchAndUpdateISCSIDetails(volumeID, vol); err != nil {
Expand All @@ -368,7 +364,7 @@ func (ns *node) prepareVolumeForNode(
return err
} else if err == nil && oldvol != nil {
if oldvol.DeletionTimestamp != nil {
return errors.Errorf("Volume still mounted on node: %s", nodeID)
return errors.Errorf("Volume %s still mounted on node: %s", volumeID, nodeID)
}
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/driver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func New(config *config.Config) *CSIDriver {
driver.cs = NewController(driver)

case "node":
utils.CleanupOnRestart()
if err := utils.Cleanup(); err != nil {
logrus.Fatalf(err.Error())
}
// Start monitor goroutine to monitor the
// mounted paths. If a path goes down or
// becomes read only (in case of RW mount
Expand Down
4 changes: 2 additions & 2 deletions pkg/driver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func addVolumeToTransitionList(volumeID string, status apis.CStorVolumeAttachmen
defer utils.TransitionVolListLock.Unlock()

if _, ok := utils.TransitionVolList[volumeID]; ok {
return fmt.Errorf("Volume Busy, status: %v",
utils.TransitionVolList[volumeID])
return fmt.Errorf("Volume %s Busy, status: %v",
volumeID, utils.TransitionVolList[volumeID])
}
utils.TransitionVolList[volumeID] = status
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
node "github.com/openebs/cstor-csi/pkg/kubernetes/node"
pv "github.com/openebs/cstor-csi/pkg/kubernetes/persistentvolume"
errors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -151,6 +152,7 @@ func DeleteOldCStorVolumeAttachmentCRs(volumeID string) error {
}

for _, csivol := range csivols.Items {
logrus.Infof("Marking cva %s for deletion", csivol.Name)
err = csivolume.NewKubeclient().
WithNamespace(OpenEBSNamespace).Delete(csivol.Name)
if err != nil {
Expand All @@ -165,6 +167,7 @@ func DeleteOldCStorVolumeAttachmentCRs(volumeID string) error {

// DeleteCStorVolumeAttachmentCR removes the CStorVolumeAttachmentCR for the specified path
func DeleteCStorVolumeAttachmentCR(csivolName string) error {
logrus.Infof("Deleting cva %s", csivolName)
return csivolume.NewKubeclient().
WithNamespace(OpenEBSNamespace).Delete(csivolName)
}
81 changes: 56 additions & 25 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"golang.org/x/net/context"

"github.com/google/uuid"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
apis "github.com/openebs/cstor-csi/pkg/apis/cstor/v1"
"github.com/openebs/cstor-csi/pkg/cstor/snapshot"
Expand Down Expand Up @@ -116,13 +117,27 @@ func logGRPC(
ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
logrus.Debugf("GRPC call: %s", info.FullMethod)
logrus.Debugf("GRPC request: %s", protosanitizer.StripSecrets(req))

var printlog bool
id := uuid.New()
if strings.Contains(info.FullMethod, "NodeGetCapabilities") ||
strings.Contains(info.FullMethod, "NodeGetVolumeStats") ||
strings.Contains(info.FullMethod, "ControllerGetCapabilities") ||
strings.Contains(info.FullMethod, "GetPluginInfo") ||
strings.Contains(info.FullMethod, "GetPluginCapabilities") ||
strings.Contains(info.FullMethod, "Probe") {
printlog = true
}
if !printlog {
logrus.Infof("Req %s: %s %s", id, info.FullMethod, protosanitizer.StripSecrets(req))
}
resp, err := handler(ctx, req)
if err != nil {
logrus.Errorf("GRPC error: %v", err)
} else {
logrus.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
if !printlog {
logrus.Infof("Resp %s: %s", id, protosanitizer.StripSecrets(resp))
}
}
return resp, err
}
Expand All @@ -135,7 +150,7 @@ func ChmodMountPath(mountPath string) error {

// WaitForVolumeToBeReachable keeps the mounts on hold until the volume is
// reachable
func WaitForVolumeToBeReachable(targetPortal string) error {
func WaitForVolumeToBeReachable(volumeID string, targetPortal string) error {
var (
retries int
err error
Expand All @@ -146,7 +161,7 @@ func WaitForVolumeToBeReachable(targetPortal string) error {
// Create a connection to test if the iSCSI Portal is reachable,
if conn, err = net.Dial("tcp", targetPortal); err == nil {
conn.Close()
logrus.Infof("Volume is reachable to create connections")
logrus.Infof("Volume %s is reachable to create connections", volumeID)
return nil
}
// wait until the iSCSI targetPortal is reachable
Expand All @@ -160,8 +175,8 @@ func WaitForVolumeToBeReachable(targetPortal string) error {
// based on the kubelets retrying logic. Kubelet retries to publish
// volume after every 14s )
return fmt.Errorf(
"iSCSI Target not reachable, TargetPortal %v, err:%v",
targetPortal, err)
"iSCSI Target not reachable for %s, TargetPortal %v, err:%v",
volumeID, targetPortal, err)
}
}
}
Expand All @@ -177,14 +192,15 @@ checkVolumeStatus:
return err
} else if volStatus == "Healthy" || volStatus == "Degraded" {
// In both healthy and degraded states the volume can serve IOs
logrus.Infof("Volume is ready to accept IOs")
logrus.Infof("Volume %s is ready to accept IOs", volumeID)
} else if retries >= VolumeWaitRetryCount {
// Let the caller function decide further if the volume is still not
// ready to accdept IOs after 12 seconds ( This number was arrived at
// based on the kubelets retrying logic. Kubelet retries to publish
// volume after every 14s )
return fmt.Errorf(
"Volume is not ready: Replicas yet to connect to controller",
"Volume %s is not ready: Replicas yet to connect to controller",
volumeID,
)
} else {
TransitionVolList[volumeID] = apis.CStorVolumeAttachmentStatusWaitingForVolumeToBeReady
Expand Down Expand Up @@ -238,6 +254,7 @@ func MonitorMounts() {
mounter := mount.New("")
ticker := time.NewTicker(MonitorMountRetryTimeout * time.Second)
for {
cleanupRequired := false
select {
case <-ticker.C:
// Get list of mounted paths present with the node
Expand All @@ -253,6 +270,7 @@ func MonitorMounts() {
for _, vol := range csivolList.Items {
// ignore monitoring for volumes with deletion timestamp set
if vol.DeletionTimestamp != nil {
cleanupRequired = true
continue
}
// ignore monitoring the mount for a block device
Expand Down Expand Up @@ -318,22 +336,33 @@ func MonitorMounts() {
}
TransitionVolListLock.Unlock()
}
if cleanupRequired {
Cleanup()
}
}
}

// CleanupOnRestart unmounts and detaches the volumes having
// Cleanup unmounts and detaches the volumes having
// DeletionTimestamp set and removes finalizers from the
// corresponding CStorVolumeAttachment CRs
func CleanupOnRestart() {
func Cleanup() (err error) {
var (
err error
csivolList *apis.CStorVolumeAttachmentList
)
// Get list of mounted paths present with the node
TransitionVolListLock.Lock()
defer TransitionVolListLock.Unlock()
if csivolList, err = GetVolListForNode(); err != nil {
return
count := 0
for {
if csivolList, err = GetVolListForNode(); err == nil {
Copy link
Contributor

@shubham14bajpai shubham14bajpai Jan 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this error out more than 5 times then the list will be empty and the error will not be logged as well. Can we log the error outside the for loop in that case?

Copy link
Contributor Author

@payes payes Jan 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it errors 5 times, we are returning an error to the caller function. If this func returns an error during startup, we will fatal out else we ignore the error. Will add one extra log if it errors 5 times.

break
}
time.Sleep(time.Second)
count++
if count == 5 {
logrus.Errorf("Failed to get cva list, err: %v", err)
return
}
}
for _, Vol := range csivolList.Items {
if Vol.DeletionTimestamp == nil {
Expand All @@ -344,8 +373,10 @@ func CleanupOnRestart() {
// This is being run in a go routine so that if unmount and detach
// commands take time, the startup is not delayed
go func(vol *apis.CStorVolumeAttachment) {
logrus.Infof("Cleaning up %s from node", vol.Spec.Volume.Name)
if err := iscsiutils.UnmountAndDetachDisk(vol, vol.Spec.Volume.StagingTargetPath); err == nil {
vol.Finalizers = nil
logrus.Infof("Cleaning up cva %s", vol.Name)
if vol, err = UpdateCStorVolumeAttachmentCR(vol); err != nil {
logrus.Errorf(err.Error())
}
Expand All @@ -354,16 +385,16 @@ func CleanupOnRestart() {
}

TransitionVolListLock.Lock()
TransitionVolList[vol.Spec.Volume.Name] = apis.CStorVolumeAttachmentStatusUnmounted
delete(TransitionVolList, vol.Spec.Volume.Name)
TransitionVolListLock.Unlock()
}(&vol)
}
return
}

// IsVolumeReachable makes a TCP connection to target
// and checks if volume is Reachable
func IsVolumeReachable(targetPortal string) (bool, error) {
func IsVolumeReachable(volumeID, targetPortal string) (bool, error) {
var (
err error
conn net.Conn
Expand All @@ -372,12 +403,12 @@ func IsVolumeReachable(targetPortal string) (bool, error) {
// Create a connection to test if the iSCSI Portal is reachable,
if conn, err = net.Dial("tcp", targetPortal); err == nil {
conn.Close()
logrus.Infof("Volume is reachable to create connections")
logrus.Infof("Volume %s is reachable to create connections", volumeID)
return true, nil
}
logrus.Infof(
"iSCSI Target not reachable, TargetPortal %v, err:%v",
targetPortal, err,
"iSCSI Target not reachable, VolumeID: %s TargetPortal %v, err:%v",
volumeID, targetPortal, err,
)
return false, err
}
Expand All @@ -391,10 +422,10 @@ func IsVolumeReady(volumeID string) (bool, error) {
}
if volStatus == "Healthy" || volStatus == "Degraded" {
// In both healthy and degraded states the volume can serve IOs
logrus.Infof("Volume is ready to accept IOs")
logrus.Infof("Volume %s is ready to accept IOs", volumeID)
return true, nil
}
logrus.Infof("Volume is not ready: Replicas yet to connect to controller")
logrus.Infof("Volume %s is not ready: Replicas yet to connect to controller", volumeID)
return false, nil
}

Expand All @@ -410,7 +441,7 @@ func WaitForVolumeReadyAndReachable(vol *apis.CStorVolumeAttachment) error {
return err
}
// This function return after 12s in case the volume is not reachable
err = WaitForVolumeToBeReachable(vol.Spec.ISCSI.TargetPortal)
err = WaitForVolumeToBeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal)
if err != nil {
logrus.Error(err)
return err
Expand Down Expand Up @@ -438,10 +469,10 @@ func RemountVolume(
options := []string{"rw"}

if ready, err := IsVolumeReady(vol.Spec.Volume.Name); err != nil || !ready {
return fmt.Errorf("Volume is not ready")
return fmt.Errorf("Volume %s is not ready", vol.Spec.Volume.Name)
}
if reachable, err := IsVolumeReachable(vol.Spec.ISCSI.TargetPortal); err != nil || !reachable {
return fmt.Errorf("Volume is not reachable")
if reachable, err := IsVolumeReachable(vol.Spec.Volume.Name, vol.Spec.ISCSI.TargetPortal); err != nil || !reachable {
return fmt.Errorf("Volume %s is not reachable", vol.Spec.Volume.Name)
}
if stagingPathExists {
mounter.Unmount(vol.Spec.Volume.StagingTargetPath)
Expand Down